🚨 Latest Research:Tanstack npm Packages Compromised in Ongoing Mini Shai-Hulud Supply-Chain Attack.Learn More
Socket
Book a DemoSign in
Socket

processing

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

processing - pypi Package Compare versions

Comparing version
0.37
to
0.38
+358
queue.py
#
# Module implementing queues
#
# processing/queue.py
#
# Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt
#
__all__ = ['Queue', 'SimpleQueue']
import sys
import os
import threading
import collections
import time
import atexit
import weakref
from processing import _processing, process, Pipe
from processing.synchronize import Lock, BoundedSemaphore
from processing.process import debug, Finalize
from Queue import Empty, Full
#
# Ensure cleanup func of `processing` runs before that of `threading`
#
atexit._exithandlers.remove((process._exit_func, (), {}))
atexit._exithandlers.append((process._exit_func, (), {}))
#
# Queue type based on a pipe -- uses a buffer and a thread
#
class Queue(object):
def __init__(self, maxsize=0):
if sys.platform == 'win32':
from processing.connection import Listener, Client
l = Listener()
reader = Client(l.address)
writer = l.accept()
else:
rfd, wfd = os.pipe()
reader = _processing.SocketConnection(rfd)
writer = _processing.SocketConnection(wfd)
os.close(rfd), os.close(wfd)
rlock = Lock()
wlock = Lock()
if maxsize == 0:
sem = None
else:
sem = BoundedSemaphore(maxsize)
state = maxsize, reader, writer, rlock, wlock, sem, os.getpid()
self.__setstate__(state)
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._closed = False
self._close = None
self._jointhread = None
self._send = self._writer.send
self._recv = self._reader.recv
self._poll = self._reader.poll
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
def __getstate__(self):
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def put(self, obj, block=True, timeout=None):
assert not self._closed
if self._maxsize:
if block and timeout is None:
if self._sem:
self._sem.acquire()
else:
if not block:
timeout = 0.0
else:
timeout = max(0.0, timeout)
if not self._sem._block.acquire_timeout(timeout):
raise Full
self._notempty.acquire()
try:
if self._thread is None:
self._startthread()
self._buffer.append(obj)
self._notempty.notify()
finally:
self._notempty.release()
def putmany(self, iterable):
assert not self._closed
assert self._maxsize == 0
self._notempty.acquire()
try:
if self._thread is None:
self._startthread()
self._buffer.extend(iterable)
self._notempty.notify()
finally:
self._notempty.release()
def get(self, block=1, timeout=None):
if block and timeout is None:
self._rlock.acquire()
try:
res = self._recv()
if self._sem:
self._sem.release()
return res
finally:
self._rlock.release()
else:
if not block:
timeout = 0.0
else:
timeout = max(0.0, timeout)
deadline = time.time() + timeout
if not self._rlock._block.acquire_timeout(timeout):
raise Empty
try:
timeout = max(0.0, deadline - time.time())
if not self._poll(timeout):
raise Empty
res = self._recv()
if self._sem:
self._sem.release()
return res
finally:
self._rlock.release()
def qsize(self):
raise NotImplementedError
def empty(self):
return not self._poll()
def full(self):
return bool(self._wsem) and self._wsem._block._getvalue() == 0
def get_nowait(self, obj):
return self.get(obj, False)
def put_nowait(self, obj):
return self.put(obj, False)
def close(self):
if self._close:
self._close()
self._closed = True
def jointhread(self):
assert self._closed
if self._jointhread:
self._jointhread()
def canceljoin(self):
self._jointhread.cancel()
def _startthread(self):
# Start thread which transfers data from buffer to pipe
self._thread = threading.Thread(
target=Queue._feed,
args=[self._buffer, self._notempty, self._send, self._wlock]
)
self._thread.setDaemon(True)
self._thread.start()
# Send sentinel to the thread queue object is garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
priority=10, atexit=True
)
# Flush data to pipe
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
atexit=True
)
# Don't bother joining on exit if this process created the queue
if self._opid == os.getpid():
self._jointhread.cancel()
@staticmethod
def _finalize_join(twr):
debug('joining queue thread')
thread = twr()
if thread is not None:
thread.join()
debug('... queue thread joined')
else:
debug('... queue thread already dead')
@staticmethod
def _finalize_close(buffer, notempty):
debug('telling thread used by a buffered queue to quit')
notempty.acquire()
try:
buffer.append(_sentinel)
notempty.notify()
finally:
notempty.release()
@staticmethod
def _feed(buffer, notempty, send, writelock):
debug('starting thread to feed data to pipe')
while 1:
notempty.acquire()
try:
if not buffer:
notempty.wait()
finally:
notempty.release()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
debug('feeder thread got sentinel -- exiting')
return
writelock.acquire()
try:
send(obj)
finally:
writelock.release()
except IndexError:
pass
_sentinel = object()
#
# Simplified Queue type -- really just a locked pipe
#
class SimpleQueue(object):
def __init__(self):
if sys.platform == 'win32':
from processing.connection import Listener, Client
l = Listener()
reader = Client(l.address)
writer = l.accept()
state = reader, writer, Lock(), None
else:
rfd, wfd = os.pipe()
reader = _processing.SocketConnection(rfd)
writer = _processing.SocketConnection(wfd)
os.close(rfd), os.close(wfd)
state = reader, writer, Lock(), Lock()
self.__setstate__(state)
def empty(self):
return not self._reader.poll()
def __setstate__(self, state):
self._reader, self._writer, self._rlock, self._wlock = state
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
def get():
racquire()
try:
return recv()
finally:
rrelease()
self.get = get
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self.put = self._writer.send
else:
send = self._writer.send
wacquire, wrelease = self._wlock.acquire, self._wlock.release
def put(obj):
wacquire()
try:
return send(obj)
finally:
wrelease()
self.put = put
def __getstate__(self):
# state will only be picklable on windows
return self._reader, self._writer, self._rlock, self._wlock
#
# Queue type based on a posix queue
#
if hasattr(_processing, 'Queue'):
class PosixQueue(_processing.Queue):
_count = 0
_count_lock = threading.RLock()
_defaults = None
def __init__(self, maxsize=0, msgsize=0):
assert maxsize >= 0 and msgsize >= 0
PosixQueue._count_lock.acquire()
try:
PosixQueue._count += 1
count = PosixQueue._count
finally:
PosixQueue._count_lock.release()
_name = '/pyq-%s-%s' % (os.getpid(), count)
if (maxsize, msgsize) != (0, 0):
defmaxsize, defmsgsize = self.getdefaults()
maxsize = maxsize or defmaxsize
msgsize = msgsize or defmsgsize
_processing.Queue.__init__(self, maxsize, msgsize, _name, True)
# We immediately unlink the name of the queue since
# otherwise the queue might not get removed (till the next
# reboot) if python gets killed. This means that `Queue`
# objects are not picklable, but that does not prevent a
# child process from using a `Queue` object inherited from
# its parent.
self._unlink()
def get_nowait(self):
return self.get(False)
def put_nowait(self, item):
return self.put(item, False)
@staticmethod
def getdefaults():
PosixQueue._count_lock.acquire()
try:
if PosixQueue._defaults is None:
temp = PosixQueue()
PosixQueue._defaults = (temp._maxmsg, temp._msgsize)
temp._close()
return PosixQueue._defaults
finally:
PosixQueue._count_lock.release()
__all__ += ['PosixQueue']
+14
-26

@@ -39,3 +39,3 @@ #

__version__ = '0.37'
__version__ = '0.38'

@@ -85,3 +85,3 @@ __all__ = [

from process import freezeSupport, ProcessExit, Finalize
from process import getLogger, enableLogging
from process import getLogger, enableLogging, note

@@ -153,5 +153,4 @@ #

__all__ += [
'LocalManager', 'Lock', 'RLock', 'Semaphore',
'BoundedSemaphore', 'Condition', 'Event',
'PipeQueue', 'BufferedPipeQueue', 'Queue', 'BufferedQueue'
'LocalManager', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
'Condition', 'Event', 'Queue', 'SimpleQueue'
]

@@ -208,18 +207,17 @@

def PipeQueue(maxsize=0):
def Queue(maxsize=0):
'''
Returns a queue object implemented using a pipe
'''
from processing.synchronize import PipeQueue
return PipeQueue(maxsize)
from processing.queue import Queue
return Queue(maxsize)
def BufferedPipeQueue(maxsize=0):
def SimpleQueue():
'''
Returns a queue for which `put()` always succeeds without blocking
Returns a simplified queue object implemented using a pipe
'''
from processing.synchronize import BufferedPipeQueue
return BufferedPipeQueue(maxsize)
from processing.queue import SimpleQueue
return SimpleQueue()
Queue = PipeQueue
BufferedQueue = BufferedPipeQueue
PipeQueue = BufferedPipeQueue = BufferedQueue = Queue # deprecrated names

@@ -229,3 +227,3 @@

__all__ += ['PosixQueue', 'BufferedPosixQueue']
__all__ += ['PosixQueue']

@@ -236,13 +234,3 @@ def PosixQueue(maxsize=0, msgsize=0):

'''
from processing.synchronize import PosixQueue
from processing.queue import PosixQueue
return PosixQueue(maxsize, msgsize)
def BufferedPosixQueue(maxsize=0, msgsize=0):
'''
Returns a queue for which `put()` always succeeds without blocking
'''
from processing.synchronize import BufferedPosixQueue
return BufferedPosixQueue(maxsize)
Queue = PosixQueue
BufferedQueue = BufferedPosixQueue

@@ -5,2 +5,61 @@ ========================================

Changes in 0.38
---------------
* Have revamped the queue types. Now the queue types are
`Queue`, `SimpleQueue` and (on systems which support it)
`PosixQueue`.
Now `Queue` should behave just like Python's normal `Queue.Queue`
class except that `qsize()`, `task_done()` and `join()` are not
implemented. In particular, if no maximum size was specified when
the queue was created then `put()` will always succeed without
blocking.
A `SimpleQueue` instance is really just a pipe protected by a couple
of locks. It has `get()`, `put()` and `empty()` methods but does
not not support timeouts or non-blocking.
`BufferedPipeQueue()` and `PipeQueue()` remain as deprecated
aliases of `Queue()` but `BufferedPosixQueue()` has been removed.
(Not sure if we really need to keep `PosixQueue()`...)
* Previously the `Pool.shutdown()` method was a little dodgy -- it
could block indefinitely if `map()` or `imap*()` were used and did
not try to terminate workers while they were doing a task.
Now there are three new methods `close()`, `terminate()` and
`join()` -- `shutdown()` is retained as a deprecated alias of
`terminate()`. Thanks to Gerald John M. Manipon for feature
request/suggested patch to `shutdown()`.
* `Pool.imap()` and `Pool.imap_unordered()` has gained a `chunksize`
argument which allows the iterable to be submitted to the pool in
chunks. Choosing `chunksize` appropriately makes `Pool.imap()`
almost as fast as `Pool.map()` even for long iterables and cheap
functions.
* Previously on Windows when the cleanup code for a `LocalManager`
attempts to unlink the name of the file which backs the shared
memory map an exception is raised if a child process still exists
which has a handle open for that mmap. This is likely to happen if
a daemon process inherits a `LocalManager` instance.
Now the parent process will remember the filename and attempt to
unlink the file name again once all the child processes have been
joined or terminated. Reported by Paul Rudin.
* `types.MethodType` is registered with `copy_reg` so now instance
methods and class methods should be picklable. (Unfortunately there is
no obvious way of supporting the pickling of staticmethods since
they are not marked with the class in which they were defined.)
This means that on Windows it is now possible to use an instance
method or class method as the target callable of a Process object.
* On Windows `reduction.fromfd()` now returns true instances of
`_socket.socket`, so there is no more need for the
`_processing.falsesocket` type.
Changes in 0.37

@@ -7,0 +66,0 @@ ---------------

@@ -19,3 +19,3 @@ #

from cPickle import dumps, loads
from processing.process import Finalize, currentProcess, log
from processing.process import Finalize, currentProcess, subdebug

@@ -301,3 +301,3 @@ #

log(5, 'listener has bound address %r', self._address)
subdebug('listener has bound address %r', self._address)

@@ -316,5 +316,5 @@ self.close = Finalize(

# staticmethod
@staticmethod
def _finalize_socketlistener(address, family, sock):
log(5, 'closing listener with address=%r', address)
subdebug('closing listener with address=%r', address)
sock.close()

@@ -326,3 +326,2 @@ if family == 'AF_UNIX':

pass
_finalize_socketlistener = staticmethod(_finalize_socketlistener)

@@ -374,3 +373,3 @@ def SocketClient(address):

log(5, 'listener created with address=%r', self._address)
subdebug('listener created with address=%r', self._address)

@@ -392,8 +391,7 @@ self.close = Finalize(

# staticmethod
@staticmethod
def _finalize_pipelistener(queue, address):
log(5, 'closing listener with address=%r', address)
subdebug('closing listener with address=%r', address)
for handle in queue:
_processing.CloseHandle(handle)
_finalize_pipelistener = staticmethod(_finalize_pipelistener)

@@ -400,0 +398,0 @@ def PipeClient(address):

@@ -98,3 +98,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dl class="docutils">
<dt><strong>Queues</strong></dt>
<dt><strong>Queues</strong>:</dt>
<dd><p class="first">The function <tt class="docutils literal"><span class="pre">Queue()</span></tt> returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt>

@@ -122,3 +122,3 @@ -- see the Python standard documentation. For example</p>

</dd>
<dt><strong>Pipes</strong></dt>
<dt><strong>Pipes</strong>:</dt>
<dd><p class="first">The <tt class="docutils literal"><span class="pre">Pipe()</span></tt> function returns a pair of connection objects

@@ -292,9 +292,7 @@ connected by a duplex (two-way) pipe, for example</p>

not significantly slower than passing the same objects between threads
using <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt>. (In fact on Linux it tends to be faster.) One
can also use <tt class="docutils literal"><span class="pre">processing.Pipe</span></tt> which at least on Windows is quite a
bit faster than <tt class="docutils literal"><span class="pre">processing.Queue</span></tt>.</p>
using <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt>. One can also use <tt class="docutils literal"><span class="pre">processing.Pipe</span></tt>.</p>
<p>The following benchmarks were performed on a single core Pentium 4,
2.5Ghz laptop running Windows XP and Ubuntu Linux 6.10 --- see
<a class="reference" href="../test/test_speed.py">test_speed.py</a>.</p>
<p><em>Number of 256 byte strings passed between processes/threads per sec</em>:</p>
<p><em>Number of 256 byte string objects passed between processes/threads per sec</em>:</p>
<table border="1" class="docutils">

@@ -315,12 +313,16 @@ <colgroup>

<td>49,000</td>
<td>17,000-50,000 <a class="footnote-reference" href="#id4" id="id1" name="id1">[1]</a></td>
<td>17,000-50,000 <a class="footnote-reference" href="#id2" id="id1" name="id1">[1]</a></td>
</tr>
<tr><td>processing.Queue</td>
<td>22,000</td>
<td>21,000</td>
</tr>
<tr><td>processing.SimpleQueue</td>
<td>33,000</td>
<td>43,000</td>
</tr>
<tr><td>processing.PosixQueue</td>
<td>n/a</td>
<td>62,000 <a class="footnote-reference" href="#id5" id="id2" name="id2">[2]</a></td>
<td>62,000</td>
</tr>
<tr><td>processing.PipeQueue</td>
<td>27,000</td>
<td>37,000 <a class="footnote-reference" href="#id5" id="id3" name="id3">[2]</a></td>
</tr>
<tr><td>Queue managed by server</td>

@@ -336,16 +338,9 @@ <td>6,900</td>

</table>
<table class="docutils footnote" frame="void" id="id4" rules="none">
<table class="docutils footnote" frame="void" id="id2" rules="none">
<colgroup><col class="label" /><col /></colgroup>
<tbody valign="top">
<tr><td class="label"><a class="fn-backref" href="#id1" name="id4">[1]</a></td><td>For some reason the performance of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> is very
<tr><td class="label"><a class="fn-backref" href="#id1" name="id2">[1]</a></td><td>For some reason the performance of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> is very
variable on Linux.</td></tr>
</tbody>
</table>
<table class="docutils footnote" frame="void" id="id5" rules="none">
<colgroup><col class="label" /><col /></colgroup>
<tbody valign="top">
<tr><td class="label"><a name="id5">[2]</a></td><td><em>(<a class="fn-backref" href="#id2">1</a>, <a class="fn-backref" href="#id3">2</a>)</em> <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> is an alias for <tt class="docutils literal"><span class="pre">processing.PosixQueue</span></tt> if
available or <tt class="docutils literal"><span class="pre">processing.PipeQueue</span></tt>.</td></tr>
</tbody>
</table>
<p><em>Number of acquires/releases of a lock per sec</em>:</p>

@@ -352,0 +347,0 @@ <table border="1" class="docutils">

@@ -306,5 +306,3 @@ .. include:: header.txt

not significantly slower than passing the same objects between threads
using `Queue.Queue`. (In fact on Linux it tends to be faster.) One
can also use `processing.Pipe` which at least on Windows is quite a
bit faster than `processing.Queue`.
using `Queue.Queue`. One can also use `processing.Pipe`.

@@ -316,3 +314,3 @@ The following benchmarks were performed on a single core Pentium 4,

*Number of 256 byte strings passed between processes/threads per sec*:
*Number of 256 byte string objects passed between processes/threads per sec*:

@@ -323,4 +321,5 @@ ================================== ========== ==================

Queue.Queue 49,000 17,000-50,000 [1]_
processing.PosixQueue n/a 62,000 [2]_
processing.PipeQueue 27,000 37,000 [2]_
processing.Queue 22,000 21,000
processing.SimpleQueue 33,000 43,000
processing.PosixQueue n/a 62,000
Queue managed by server 6,900 6,500

@@ -332,4 +331,2 @@ processing.Pipe 52,000 57,000

variable on Linux.
.. [2] `processing.Queue` is an alias for `processing.PosixQueue` if
available or `processing.PipeQueue`.

@@ -336,0 +333,0 @@

@@ -44,3 +44,5 @@ <?xml version="1.0" encoding="utf-8" ?>

accepts a single argument. When the result becomes ready
<tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).</p>
<tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).
<tt class="docutils literal"><span class="pre">callback</span></tt> should complete immediately since otherwise the
thread which handles the results will get blocked.</p>
</dd>

@@ -52,3 +54,3 @@ <dt><tt class="docutils literal"><span class="pre">map(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=None)</span></tt></dt>

it submits to the process pool as separate tasks. The
(maximum) size of these chunks can be specified by setting
(approximate) size of these chunks can be specified by setting
<tt class="docutils literal"><span class="pre">chunksize</span></tt> to a positive integer.</p>

@@ -61,17 +63,19 @@ </dd>

accepts a single argument. When the result becomes ready
<tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).</p>
<tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).
<tt class="docutils literal"><span class="pre">callback</span></tt> should complete immediately since otherwise the
thread which handles the results will get blocked.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">imap(func,</span> <span class="pre">iterable)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">imap(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt>
<dd><p class="first">An equivalent of <tt class="docutils literal"><span class="pre">itertools.imap()</span></tt>.</p>
<p>Note that this treats each element of the iterable as a
separate task, so for a long iterable where the function is
cheap to evaluate this likely to be <strong>much</strong> slower than using
<tt class="docutils literal"><span class="pre">map()</span></tt>.</p>
<p class="last">Also notice that the <tt class="docutils literal"><span class="pre">next(timeout=None)</span></tt> method of the
iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a timeout
parameter. <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise
<p>The <tt class="docutils literal"><span class="pre">chunksize</span></tt> argument is the same as the one used by the
<tt class="docutils literal"><span class="pre">map()</span></tt> method. For very long iterables using a large value
for <tt class="docutils literal"><span class="pre">chunksize</span></tt> can make make the job complete <strong>much</strong> faster
than using the default value of <tt class="docutils literal"><span class="pre">1</span></tt>.</p>
<p class="last">Also if <tt class="docutils literal"><span class="pre">chunksize</span></tt> is <tt class="docutils literal"><span class="pre">1</span></tt> then the <tt class="docutils literal"><span class="pre">next()</span></tt> method of the
iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a <tt class="docutils literal"><span class="pre">timeout</span></tt>
parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise
<tt class="docutils literal"><span class="pre">processing.TimeoutError</span></tt> if the result cannot be returned
with <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p>
within <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">imap_unordered(func,</span> <span class="pre">iterable)</span></tt></dt>
<dt><tt class="docutils literal"><span class="pre">imap_unordered(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt>
<dd>The same as <tt class="docutils literal"><span class="pre">imap()</span></tt> except that the ordering of the results

@@ -81,6 +85,13 @@ from the returned iterator should be considered arbitrary.

guaranteed to be &quot;correct&quot;.)</dd>
<dt><tt class="docutils literal"><span class="pre">shutdown()</span></tt></dt>
<dd>Tells the worker processes to shutdown. Gets called
automatically when the pool object is garbage collected or
when the process shuts down.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd>Prevents any more tasks from being submitted to the pool.
Once all the tasks have been completed the worker processes
will exit.</dd>
<dt><tt class="docutils literal"><span class="pre">terminate()</span></tt></dt>
<dd>Stops the worker processes immediately without completing
outstanding work. When the pool object is garbage collected
<tt class="docutils literal"><span class="pre">terminate()</span></tt> will be called immediately.</dd>
<dt><tt class="docutils literal"><span class="pre">join()</span></tt></dt>
<dd>Wait for the worker processes to exit. One must call
<tt class="docutils literal"><span class="pre">close()</span></tt> or <tt class="docutils literal"><span class="pre">terminate()</span></tt> before using <tt class="docutils literal"><span class="pre">join()</span></tt>.</dd>
</dl>

@@ -87,0 +98,0 @@ </blockquote>

@@ -37,2 +37,4 @@ .. include:: header.txt

`callback` is applied to it (unless the call failed).
`callback` should complete immediately since otherwise the
thread which handles the results will get blocked.

@@ -45,3 +47,3 @@ `map(func, iterable, chunksize=None)`

it submits to the process pool as separate tasks. The
(maximum) size of these chunks can be specified by setting
(approximate) size of these chunks can be specified by setting
`chunksize` to a positive integer.

@@ -56,18 +58,20 @@

`callback` is applied to it (unless the call failed).
`callback` should complete immediately since otherwise the
thread which handles the results will get blocked.
`imap(func, iterable)`
`imap(func, iterable, chunksize=1)`
An equivalent of `itertools.imap()`.
Note that this treats each element of the iterable as a
separate task, so for a long iterable where the function is
cheap to evaluate this likely to be **much** slower than using
`map()`.
The `chunksize` argument is the same as the one used by the
`map()` method. For very long iterables using a large value
for `chunksize` can make make the job complete **much** faster
than using the default value of `1`.
Also notice that the `next(timeout=None)` method of the
iterator returned by the `imap()` method has a timeout
parameter. `next(timeout)` will raise
Also if `chunksize` is `1` then the `next()` method of the
iterator returned by the `imap()` method has an optional
`timeout` parameter: `next(timeout)` will raise
`processing.TimeoutError` if the result cannot be returned
with `timeout` seconds.
within `timeout` seconds.
`imap_unordered(func, iterable)`
`imap_unordered(func, iterable, chunksize=1)`
The same as `imap()` except that the ordering of the results

@@ -78,8 +82,17 @@ from the returned iterator should be considered arbitrary.

`shutdown()`
Tells the worker processes to shutdown. Gets called
automatically when the pool object is garbage collected or
when the process shuts down.
`close()`
Prevents any more tasks from being submitted to the pool.
Once all the tasks have been completed the worker processes
will exit.
`terminate()`
Stops the worker processes immediately without completing
outstanding work. When the pool object is garbage collected
`terminate()` will be called immediately.
`join()`
Wait for the worker processes to exit. One must call
`close()` or `terminate()` before using `join()`.
Asynchronous result objects

@@ -86,0 +99,0 @@ ===========================

@@ -31,3 +31,3 @@ <?xml version="1.0" encoding="utf-8" ?>

<dt><strong>exception</strong> <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt></dt>
<dd><p class="first">Exception raise by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a
<dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a
<a class="reference" href="connection-ref.html#connection-objects">connection object</a>

@@ -76,71 +76,79 @@ when the supplied buffer object is too small for the message

same time.</p>
<p>Also note that if a process is killed while it is trying to
read or write to a pipe then the data in the pipe is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).</p>
<p class="last">On Windows this requires the <tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt>
<dd><p class="first">Alias for either <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> if available or else <tt class="docutils literal"><span class="pre">PipeQueue</span></tt>
-- see below.</p>
<p class="last">It differs from Python standard <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> type by only
have finite capacity, so even when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is specified as
<tt class="docutils literal"><span class="pre">0</span></tt> the <tt class="docutils literal"><span class="pre">put()</span></tt> method might block. If you need to be sure
that <tt class="docutils literal"><span class="pre">put()</span></tt> will not block then you should use
<tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt> instead.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt></dt>
<dd><p class="first">Alias for either <tt class="docutils literal"><span class="pre">BufferedPosixQueue</span></tt> if available or else
<tt class="docutils literal"><span class="pre">BufferedPipeQueue</span></tt>.</p>
<p>Differs from <tt class="docutils literal"><span class="pre">Queue()</span></tt> by guaranteeing that the <tt class="docutils literal"><span class="pre">put()</span></tt> method
will not block. A buffered queue tends to be slower than a
normal queue if you are putting items on the queue one by one
(rather than using <tt class="docutils literal"><span class="pre">putmany()</span></tt>).</p>
<p>The first time a process puts an item on a buffered queue a
thread is started whose job is to transfer items from a buffer
onto the true interprocess queue. In addition to the usual
queue methods <tt class="docutils literal"><span class="pre">BufferedQueue</span></tt> supports two extra:</p>
<blockquote class="last">
<dd><p class="first">Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.</p>
<p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt>
method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and
<tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p>
<p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods:</p>
<blockquote>
<dl class="docutils">
<dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt>
<dd>Adds all items in the iterable to the queue's buffer.
So <tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster equivalent of
<tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span> <span class="pre">q.put(x)</span></tt>.</dd>
<dd>If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
<tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span>
<span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite
size.</dd>
<dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt>
<dd>Flushes data from the buffer to the interprocess
queue, then instructs the thread to stop and waits for
it to do so. This will be called automatically when
the queue is garbage collected or when the process
exits.</dd>
<dd>Indicates that no more data will be put on this queue
by the current process. The background thread will
quit once it has flushed all it data. This is called
automatically when the queue is garbage collected.</dd>
<dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt>
<dd><p class="first">This joins the background thread and can only be used
after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.</p>
<p class="last">By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
<tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt>
<dd>Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.</dd>
</dl>
</blockquote>
<p class="last">Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">PipeQueue(maxsize=0)</span></tt></dt>
<dd><p class="first">Returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the
<tt class="docutils literal"><span class="pre">qsize()</span></tt> method is not implemented. It is implemented using
a pipe and some locks/semaphores.</p>
<p>On Unix if a client terminates while it is reading or writing
from the queue, other clients reading from the queue may lose
track of where messages boundaries are or may retrieve
incomplete messages. At least on Windows a keyboard interrupt
(SIGINT) or the use of a process's <tt class="docutils literal"><span class="pre">stop()</span></tt> method should not
cause such a problem.</p>
<p>Placing an object on a <tt class="docutils literal"><span class="pre">PipeQueue</span></tt> can block because of lack
of buffer space even if a zero timeout is used.</p>
<p class="last">Requires support for native semaphore support from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p>
<dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt>
<dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is
really just a pipe protected by a couple of locks.</p>
<p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have
<tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt>
method.</p>
<p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero)
using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.</p>
</dd>
<dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt>
<dd><p class="first">Returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> implemented using a (Unix
only) posix message queue.</p>
<p>If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is non-zero it determines the maximum number of
messages that can be in the queue and if <tt class="docutils literal"><span class="pre">msgsize</span></tt> is non-zero
it determines the maximum size in bytes of a message. If
either is zero then the system default (which is finite) is
used. (For instance on my Linux system the default maximum
number of messages in a queue is 10 and the maximum message
size is 8192 bytes.) A <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> object has attributes
<tt class="docutils literal"><span class="pre">_maxmsg</span></tt> and <tt class="docutils literal"><span class="pre">_maxsize</span></tt> which give these limits for that
queue.</p>
<p>Note that if <tt class="docutils literal"><span class="pre">maxsize</span></tt> or <tt class="docutils literal"><span class="pre">msgsize</span></tt> is larger than the system
maximum then an <tt class="docutils literal"><span class="pre">OSError</span></tt> exception will be thrown. On Linux
the system maximums can viewed and modified through the
<tt class="docutils literal"><span class="pre">/proc</span></tt> filesystem --- see <tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p>
<p class="last">Only available on Unix and only if support for posix queues
was built in to <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p>
<dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix
systems which support Posix message queues.</p>
<p>However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
<tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p>
<p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value
is used.</p>
<p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p>
<p class="last">If one tries to send a message which is too long then
<tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p>
</dd>

@@ -275,5 +283,5 @@ </dl>

<dt><tt class="docutils literal"><span class="pre">enableLogging(level,</span> <span class="pre">HandlerType=None,</span> <span class="pre">handlerArgs=(),</span> <span class="pre">format=None)</span></tt></dt>
<dd><p class="first">Enables logging and sets the debug level to <tt class="docutils literal"><span class="pre">level</span></tt> -- see
documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt> package in the standard
library.</p>
<dd><p class="first">Enables logging and sets the debug level used by the package's
logger to <tt class="docutils literal"><span class="pre">level</span></tt> -- see documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt>
package in the standard library.</p>
<p>If <tt class="docutils literal"><span class="pre">HandlerType</span></tt> is specified then a handler is created using

@@ -280,0 +288,0 @@ <tt class="docutils literal"><span class="pre">HandlerType(*handlerArgs)</span></tt> and this will be used by the

@@ -24,3 +24,3 @@ .. include:: header.txt

**exception** `BufferTooShort`
Exception raise by the `recvbytes_into()` method of a
Exception raised by the `recvbytes_into()` method of a
`connection object <connection-ref.html#connection-objects>`_

@@ -73,78 +73,89 @@ when the supplied buffer object is too small for the message

Also note that if a process is killed while it is trying to
read or write to a pipe then the data in the pipe is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).
On Windows this requires the `_processing` extension.
`Queue(maxsize=0)`
Alias for either `PosixQueue` if available or else `PipeQueue`
-- see below.
Returns a process shared queue implemented using a pipe and a
few locks/semaphores. A background thread transfers objects
from a buffer into the pipe.
It differs from Python standard `Queue.Queue` type by only
have finite capacity, so even when `maxsize` is specified as
`0` the `put()` method might block. If you need to be sure
that `put()` will not block then you should use
`BufferedQueue()` instead.
It is a near clone of `Queue.Queue` except that the `qsize()`
method is not implemented and that the `task_done()` and
`join()` methods introduced in Python 2.5 are also missing.
`BufferedQueue()`
Alias for either `BufferedPosixQueue` if available or else
`BufferedPipeQueue`.
`Queue` has a few additional methods:
Differs from `Queue()` by guaranteeing that the `put()` method
will not block. A buffered queue tends to be slower than a
normal queue if you are putting items on the queue one by one
(rather than using `putmany()`).
`putmany(iterable)`
If the queue has infinite size then this adds all
items in the iterable to the queue's buffer. So
`q.putmany(X)` is a faster alternative to `for x in X:
q.put(x)`. Raises an error if the queue has finite
size.
The first time a process puts an item on a buffered queue a
thread is started whose job is to transfer items from a buffer
onto the true interprocess queue. In addition to the usual
queue methods `BufferedQueue` supports two extra:
`close()`
Indicates that no more data will be put on this queue
by the current process. The background thread will
quit once it has flushed all it data. This is called
automatically when the queue is garbage collected.
`putmany(iterable)`
Adds all items in the iterable to the queue's buffer.
So `q.putmany(X)` is a faster equivalent of
`for x in X: q.put(x)`.
`jointhread()`
This joins the background thread and can only be used
after `close()` has been called. This blocks until
the background thread exits, ensuring that all data in
the buffer has been flushed to the pipe.
`close()`
Flushes data from the buffer to the interprocess
queue, then instructs the thread to stop and waits for
it to do so. This will be called automatically when
the queue is garbage collected or when the process
exits.
By default if a process is not the creator of the
queue then on exit it will attempt to join the queue's
background thread. The process can call
`canceljoin()` to prevent this behaviour.
`PipeQueue(maxsize=0)`
Returns a near clone of `Queue.Queue` except that the
`qsize()` method is not implemented. It is implemented using
a pipe and some locks/semaphores.
On Unix if a client terminates while it is reading or writing
from the queue, other clients reading from the queue may lose
track of where messages boundaries are or may retrieve
incomplete messages. At least on Windows a keyboard interrupt
(SIGINT) or the use of a process's `stop()` method should not
cause such a problem.
Placing an object on a `PipeQueue` can block because of lack
of buffer space even if a zero timeout is used.
Requires support for native semaphore support from `_processing`.
`canceljoin()`
Prevents the background thread from being joined
automatically when the process exits. Unnecessary if
the current process created the queue.
Note that if a process is killed while it is trying to receive
or send to a queue then the data in the queue is likely to
become corrupted (because it may become impossible to be sure
where the message boundaries lie).
`SimpleQueue()`
A simplified and faster alternative to `Queue()`. It is
really just a pipe protected by a couple of locks.
It has `get()` and `put()` methods but these do not have
`block` or `timeout` arguments. It also has an `empty()`
method.
**Warning**: Unlike with `Queue` (when `maxsize` is zero)
using `put()` may block if the pipe's buffer does not have
sufficient space, so one must take care that no deadlocks are
possible.
`PosixQueue(maxsize=0, msgsize=0)`
Returns a near clone of `Queue.Queue` implemented using a (Unix
only) posix message queue.
If `maxsize` is non-zero it determines the maximum number of
messages that can be in the queue and if `msgsize` is non-zero
it determines the maximum size in bytes of a message. If
either is zero then the system default (which is finite) is
used. (For instance on my Linux system the default maximum
number of messages in a queue is 10 and the maximum message
size is 8192 bytes.) A `PosixQueue` object has attributes
`_maxmsg` and `_maxsize` which give these limits for that
queue.
A faster alternative to `Queue()` which is available on Unix
systems which support Posix message queues.
Note that if `maxsize` or `msgsize` is larger than the system
maximum then an `OSError` exception will be thrown. On Linux
the system maximums can viewed and modified through the
`/proc` filesystem --- see `man 7 mq_overview`.
However, posix queues have a maximum number of messages that
can occupy the queue at a given time, and each message (when
expressed as a pickled string) has a maximum length --- see
`man 7 mq_overview`.
Only available on Unix and only if support for posix queues
was built in to `_processing`.
`maxsize` if specified determines the maximum number of items
that be in the queue. Note that unlike Pythons's normal queue
type if this is greater than a system defined maximum then an
error is raised. If `maxsize` is zero then this maximum value
is used.
`msgsize` if specified determines the maximum length that each
message can be (when expressed as a pickled string). If this
is greater than a system defined maximum then an error is
raised. If `msgsize` is zero then this maximum value is used.
If one tries to send a message which is too long then
`ValueError` will be raised.

@@ -151,0 +162,0 @@

@@ -69,16 +69,2 @@ <?xml version="1.0" encoding="utf-8" ?>

</dd>
<dt><em>Queues, deadlocks and buffering</em></dt>
<dd><p class="first">Unlike Python's standard <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> type (when a maximum size
is not specified), <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> does not guarantee that
<tt class="docutils literal"><span class="pre">put()</span></tt> will succeed without blocking. If you do need this
guarantee then you can use <tt class="docutils literal"><span class="pre">processing.BufferedQueue</span></tt> instead.</p>
<p class="last">As an example, a standard pattern is for a main process to push
some tasks onto a task queue so that they will be processed by a
pool of worker processes and then for the main process to retreive
the results from a result queue. If both queues are unbuffered
then one will get a deadlock if one tries to push too many tasks
on the task queue without removing any from the result queue.
Instead one should probably make the task queue buffered -- see
<a class="reference" href="../test/test_workers.py">test_workers.py</a>.</p>
</dd>
</dl>

@@ -88,4 +74,3 @@ </div>

<h1><a id="windows" name="windows">Windows</a></h1>
<p>Platforms such as Windows which lack <tt class="docutils literal"><span class="pre">os.fork()</span></tt> have a few extra
restrictions:</p>
<p>Since Windows lacks <tt class="docutils literal"><span class="pre">os.fork()</span></tt> it has a few extra restrictions:</p>
<p><em>More picklability</em>:</p>

@@ -92,0 +77,0 @@ <blockquote>

@@ -69,23 +69,7 @@ .. include:: header.txt

*Queues, deadlocks and buffering*
Unlike Python's standard `Queue.Queue` type (when a maximum size
is not specified), `processing.Queue` does not guarantee that
`put()` will succeed without blocking. If you do need this
guarantee then you can use `processing.BufferedQueue` instead.
As an example, a standard pattern is for a main process to push
some tasks onto a task queue so that they will be processed by a
pool of worker processes and then for the main process to retreive
the results from a result queue. If both queues are unbuffered
then one will get a deadlock if one tries to push too many tasks
on the task queue without removing any from the result queue.
Instead one should probably make the task queue buffered -- see
`test_workers.py <../test/test_workers.py>`_.
Windows
-------
Platforms such as Windows which lack `os.fork()` have a few extra
restrictions:
Since Windows lacks `os.fork()` it has a few extra restrictions:

@@ -92,0 +76,0 @@ *More picklability*:

@@ -44,2 +44,6 @@ <?xml version="1.0" encoding="utf-8" ?>

</ul>
<p>The project is hosted at</p>
<ul class="simple">
<li><a class="reference" href="http://developer.berlios.de/projects/pyprocessing">http://developer.berlios.de/projects/pyprocessing</a></li>
</ul>
<p>The package can be downloaded from</p>

@@ -46,0 +50,0 @@ <ul class="simple">

@@ -8,3 +8,3 @@ import mmap

from processing import synchronize, process
from processing import synchronize, process, queue, _processing
from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize

@@ -14,3 +14,2 @@

#

@@ -24,36 +23,39 @@ # Heap implementation

'''
def __init__(self, size=256, _name=None, _lock=None):
assert size >= 256
def __init__(self, size, lock=None):
fd, self._name = tempfile.mkstemp(prefix='pym-')
self._size = remaining = size
while remaining > 0:
remaining -= os.write(fd, '\0' * remaining)
self._lock = lock or RLock()
self._mmap = mmap.mmap(fd, self._size)
os.close(fd)
self._set_pos(8)
if _name is None:
fd, self._name = tempfile.mkstemp(prefix='pym-')
remaining = size
while remaining > 0:
remaining -= os.write(fd, '\0' * remaining)
self._lock = _lock or RLock()
if sys.platform in ('win32', 'cygwin'):
process.Finalize(self, Heap._finalize_heap,
args=[self._mmap, self._name], atexit=True)
else:
assert _lock is not None
self._lock = _lock
self._name = _name
fd = os.open(_name, os.O_RDWR, 0600)
os.unlink(self._name)
self._size = size
def __getstate__(self):
if sys.platform != 'win32':
raise NotImplemented
return self._size, self._name, self._lock
def __setstate__(self, state):
self._size, self._name, self._lock = state
fd = os.open(self._name, os.O_RDWR, 0600)
self._mmap = mmap.mmap(fd, self._size)
os.close(fd)
if _name is None:
self._set_pos(8)
if sys.platform in ('win32', 'cygwin'):
def close_mmap(mmap, name):
import os
mmap.close()
os.unlink(name)
process.Finalize(
self, close_mmap, args=[self._mmap, self._name],
atexit=True
)
else:
os.unlink(self._name)
@staticmethod
def _finalize_heap(mmap, name):
import os
mmap.close()
try:
os.unlink(name)
except WindowsError:
process.info('failed to unlink %r -- try again later', name)
process._trylater.append((os.unlink, [name]))
def malloc(self, bytes):

@@ -88,53 +90,49 @@ '''

'''
self._lock.acquire()
try:
# resize mmap if the file has been enlarged by other process
if self._size < self._mmap.size():
self._mmap.resize(self._mmap.size())
self._size = len(self._mmap)
start = stop = None
# iterate over empty chunks
for start, stop in self._get_empty_chunks():
# if chunk is large enough
if stop - start >= bytes:
# write header for the chunk we are creating
self._set_chunk_info(start, start + bytes, True)
# write header for following empty chunk if necessary
if start + bytes < stop:
self._set_chunk_info(start + bytes, stop, False)
# update position of last empty chunk
if start == self._get_pos():
try:
pos, _ = self._get_empty_chunks().next()
except StopIteration:
pos = self._size
self._set_pos(pos)
return start
# get position of memory just after last occupied chunk
if stop == self._size:
pos = start
else:
pos = self._size
# work out how much space we need and resize mmap
while pos + bytes > self._size:
self._size *= 2
if self._size > len(self._mmap):
self._mmap.resize(self._size)
# mark chunk as occupied and update position of last empty chunk
self._set_chunk_info(pos, pos + bytes, True)
if pos == self._get_pos():
self._set_pos(pos + bytes)
return pos
finally:
self._lock.release()
# resize mmap if the file has been enlarged by other process
if self._size < self._mmap.size():
self._mmap.resize(self._mmap.size())
self._size = len(self._mmap)
start = stop = None
# iterate over empty chunks
for start, stop in self._get_empty_chunks():
# if chunk is large enough
if stop - start >= bytes:
# write header for the chunk we are creating
self._set_chunk_info(start, start + bytes, True)
# write header for following empty chunk if necessary
if start + bytes < stop:
self._set_chunk_info(start + bytes, stop, False)
# update position of last empty chunk
if start == self._get_pos():
try:
pos, _ = self._get_empty_chunks().next()
except StopIteration:
pos = self._size
self._set_pos(pos)
return start
# get position of memory just after last occupied chunk
if stop == self._size:
pos = start
else:
pos = self._size
# work out how much space we need and resize mmap
while pos + bytes > self._size:
self._size *= 2
if self._size > len(self._mmap):
self._mmap.resize(self._size)
# mark chunk as occupied and update position of last empty chunk
self._set_chunk_info(pos, pos + bytes, True)
if pos == self._get_pos():
self._set_pos(pos + bytes)
return pos
def _free(self, start):

@@ -236,7 +234,2 @@ '''

if sys.platform == 'win32':
def __reduce__(self):
return type(self), (self._size, self._name, self._lock)
#

@@ -459,3 +452,3 @@ # Class for a struct which lives in shared memory

Event = synchronize.Event
Queue = synchronize.Queue
Queue = queue.Queue
Lock = synchronize.Lock

@@ -471,3 +464,3 @@ RLock = synchronize.RLock

if not hasattr(self, '_heap'):
self._heap = Heap(size=self._size, _lock=self._lock)
self._heap = Heap(size=self._size, lock=self._lock)
return self._heap

@@ -474,0 +467,0 @@ finally:

@@ -23,3 +23,3 @@ #

from processing.process import Process, currentProcess, Finalize
from processing.process import debug, info, log
from processing.process import debug, info, subdebug, activeChildren

@@ -384,3 +384,6 @@ try:

info('manager received shutdown message')
process._exit_func()
Finalize._run_all_finalizers()
for p in activeChildren():
debug('terminating a child process of manager')
p.terminate()
info('process exiting with `os.exit(0)`')

@@ -430,25 +433,2 @@ os._exit(0)

def _run_server(registry, address, authkey, parent_address):
'''
Instantiate a server and run it.
This function is invoked in a subprocess.
Really it should be a staticmethod of `BaseManager`, but that
would cause problems with pickling.
'''
# create server
server = Server(registry, address, authkey)
currentProcess()._server = server
# inform parent process of the server's address
connection = Client(parent_address)
connection.send(server.address)
connection.close()
# run the manager
info('manager bound to %r' % server.address)
server.serve_forever()
class BaseManager(object):

@@ -516,2 +496,20 @@ '''

@classmethod
def _run_server(cls, registry, address, authkey, parent_address):
'''
Instantiate a server and run it
'''
# create server
server = Server(registry, address, authkey)
currentProcess()._server = server
# inform parent process of the server's address
connection = Client(parent_address)
connection.send(server.address)
connection.close()
# run the manager
info('manager bound to %r', server.address)
server.serve_forever()
def serve_forever(self):

@@ -568,3 +566,3 @@ '''

# staticmethod
@staticmethod
def _get_registry_creators(self_or_cls):

@@ -582,4 +580,2 @@ registry = {}

return registry, creators
_get_registry_creators = staticmethod(_get_registry_creators)

@@ -592,3 +588,3 @@ def __enter__(self):

# staticmethod
@staticmethod
def _finalize_manager(process, address, authkey):

@@ -617,6 +613,2 @@ '''

_finalize_manager = staticmethod(_finalize_manager)
_run_server = staticmethod(_run_server)
address = property(lambda self: self._address)

@@ -752,3 +744,3 @@

# staticmethod
@staticmethod
def _finalize_proxy(connection, key, token, private):

@@ -777,3 +769,2 @@ private._mutex.acquire()

private._mutex.release()
_finalize_proxy = staticmethod(_finalize_proxy)

@@ -792,3 +783,3 @@ def __reduce__(self):

return '<Proxy[%s] object at %s>' % (self._token.typeid,
'0x%x' % (id(self) & 0xffffffff))
'0x%x' % id(self))

@@ -872,3 +863,3 @@ def __str__(self):

'''
log(5, 'reset all proxies')
subdebug('reset all proxies')
_Private._mutex.acquire()

@@ -883,4 +874,4 @@ try:

proxy._connect(authkey=authkey, name=process_name)
except KeyError:
pass
except KeyError, e:
debug('ignoring KeyError: %r', e)
finally:

@@ -887,0 +878,0 @@ _Private._mutex.release()

Metadata-Version: 1.0
Name: processing
Version: 0.37
Version: 0.38
Summary: Package for using processes mimicking the `threading` module

@@ -29,3 +29,6 @@ Home-page: http://developer.berlios.de/projects/pyprocessing

The project is hosted at
* http://developer.berlios.de/projects/pyprocessing
The package can be downloaded from

@@ -32,0 +35,0 @@

+171
-80

@@ -21,3 +21,13 @@ #

from processing.process import debug
#
# Constants representing the state of a pool
#
RUN = 0
CLOSE = 1
TERMINATE = 2
#
# Miscellaneous

@@ -39,2 +49,4 @@ #

def worker(inqueue, outqueue):
put = outqueue.put
for job, i, func, args, kwds in iter(inqueue.get, None):

@@ -45,8 +57,10 @@ try:

result = (False, e)
outqueue.put((job, i, result))
put((job, i, result))
debug('worker got None -- exiting')
#
# Class representing a process pool
#
class Pool(object):

@@ -57,6 +71,7 @@ '''

def __init__(self, processes=None):
self._inqueue = processing.PipeQueue()
self._outqueue = processing.PipeQueue()
self._inqueue = processing.SimpleQueue()
self._outqueue = processing.SimpleQueue()
self._taskqueue = Queue.Queue()
self._cache = {}
self._state = RUN

@@ -79,24 +94,26 @@ if processes is None:

# Note: these threads will be joined by the finalizer and are
# only made daemonic because otherwise the exit handler
# registered by `threading` might try to join them
# before the finalizer is run
thandler = threading.Thread(target=Pool._handle_tasks,
args=[self._taskqueue, self._inqueue])
thandler.setDaemon(True)
thandler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=[self._taskqueue, self._inqueue, self._outqueue,
self._pool]
)
self._task_handler.setDaemon(True)
self._task_handler._state = RUN
self._task_handler.start()
rhandler = threading.Thread(target=Pool._handle_results,
args=[self._outqueue, self._cache])
rhandler.setDaemon(True)
rhandler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=[self._outqueue, self._cache]
)
self._result_handler.setDaemon(True)
self._result_handler._state = RUN
self._result_handler.start()
self.shutdown = processing.Finalize(
self, Pool._finalize_pool,
args=(self._taskqueue, self._inqueue, self._outqueue,
thandler, rhandler, self._pool),
self._terminate = processing.Finalize(
self, Pool._terminate_pool,
args=[self._taskqueue, self._outqueue, self._cache, self._pool,
self._task_handler, self._result_handler],
atexit=True
)
def apply(self, func, args=(), kwds={}):

@@ -106,30 +123,48 @@ '''

'''
assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, seq, chunksize=None):
def map(self, func, iterable, chunksize=None):
'''
Equivalent of `map()` builtin
'''
return self.map_async(func, seq, chunksize).get()
assert self._state == RUN
return self.map_async(func, iterable, chunksize).get()
def imap(self, func, seq):
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()`
'''
result = IMapIterator(self._cache)
job = result._job
self._taskqueue.put((((job, i, func, (x,), {})
for i, x in enumerate(seq)), result._setlength))
return result
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._setlength))
return result
else:
assert chunksize > 1
task_batches = Pool._gettasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._setlength))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, seq):
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
result = IMapUnorderedIterator(self._cache)
job = result._job
self._taskqueue.put((((job, i, func, (x,), {})
for i, x in enumerate(seq)), result._setlength))
return result
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._setlength))
return result
else:
assert chunksize > 1
task_batches = Pool._gettasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._setlength))
return (item for chunk in result for item in chunk)
def apply_async(self, func, args=(), kwds={}, callback=None):

@@ -139,30 +174,35 @@ '''

'''
assert self._state == RUN
result = ApplyResult(self._cache, callback)
job = result._job
self._taskqueue.put(([(job, None, func, args, kwds)], None))
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
def map_async(self, func, seq, chunksize=None, callback=None):
def map_async(self, func, iterable, chunksize=None, callback=None):
'''
Asynchronous equivalent of `map()` builtin
'''
if not hasattr(seq, '__len__'):
seq = list(seq)
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(seq), len(self._pool) * 4)
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
task_batches = Pool._gettasks(func, seq, chunksize)
result = MapResult(self._cache, chunksize, len(seq), callback)
job = result._job
self._taskqueue.put((((job, i, mapstar, (x,), {})
task_batches = Pool._gettasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
# staticmethod
def _handle_tasks(taskqueue, inqueue):
put = inqueue._put
@staticmethod
def _handle_tasks(taskqueue, inqueue, outqueue, pool):
thread = threading.currentThread()
put = inqueue._writer.send
for taskseq, setlength in iter(taskqueue.get, None):
if thread._state:
debug('task handler found thread._state != RUN -- exiting')
return
i = -1

@@ -173,7 +213,25 @@ for i, task in enumerate(taskseq):

setlength(i+1)
_handle_tasks = staticmethod(_handle_tasks)
else:
debug('task handler got None')
# staticmethod
# tell workers there is no more work
debug('task handler sending None to workers')
for p in pool:
put(None)
# tell result handler to finish when cache is empty
outqueue.put(None)
debug('task handler exiting')
@staticmethod
def _handle_results(outqueue, cache):
for job, i, obj in iter(outqueue._get, None):
thread = threading.currentThread()
get = outqueue._reader.recv
for job, i, obj in iter(get, None):
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
return
try:

@@ -183,31 +241,19 @@ cache[job]._set(i, obj)

pass
_handle_results = staticmethod(_handle_results)
# staticmethod
def _finalize_pool(taskqueue, inqueue, outqueue,
task_handler, result_handler, pool):
# stop task handler thread
taskqueue.not_empty.acquire()
try:
taskqueue.queue.clear()
taskqueue.queue.append(None)
taskqueue.not_empty.notify()
finally:
taskqueue.not_empty.release()
else:
debug('result handler got None')
# stop result handler thread
outqueue.put(None)
while cache and thread._state == RUN:
item = get()
if item is None:
debug('result handler ignoring None')
job, i, obj = item
try:
cache[job]._set(i, obj)
except KeyError:
pass
# stop work processes
for p in pool:
inqueue.put(None)
debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), thread._state)
# join all the threads and processes
task_handler.join()
result_handler.join()
for p in pool:
p.join()
_finalize_pool = staticmethod(_finalize_pool)
# staticmethod
@staticmethod
def _gettasks(func, it, size):

@@ -220,4 +266,49 @@ it = iter(it)

yield (func, x)
_gettasks = staticmethod(_gettasks)
def __reduce__(self):
raise NotImplemented
def close(self):
debug('closing pool')
self._state = CLOSE
self._taskqueue.put(None)
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._terminate()
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join()
shutdown = terminate # depracated alias
@staticmethod
def _terminate_pool(taskqueue, outqueue, cache, pool,
task_handler, result_handler):
debug('finalizing pool')
if not result_handler.isAlive():
debug('result handler already finished -- no need to terminate')
return
cache = {}
task_handler._state = TERMINATE
result_handler._state = TERMINATE
taskqueue.put(None)
outqueue.put(None)
debug('terminating pool workers')
for p in pool:
if p.isAlive():
p.terminate()
task_handler.join()
result_handler.join()
#

@@ -224,0 +315,0 @@ # Class whose instances are returned by `Pool.apply_async()`

+132
-82

@@ -11,3 +11,3 @@ #

'Process', 'currentProcess', 'activeChildren', 'freezeSupport',
'ProcessExit', 'Finalize', 'getLogger', 'enableLogging'
'ProcessExit', 'Finalize', 'getLogger', 'enableLogging', 'note'
]

@@ -19,3 +19,12 @@

import os, sys, signal, subprocess, time, atexit, weakref, random, itertools
import os
import sys
import signal
import subprocess
import time
import atexit
import weakref
import random
import itertools
import copy_reg
import encodings.hex_codec # hint to freeze tools that we need hex codec

@@ -266,3 +275,4 @@

return '<%s(%s, %s)>' % (type(self).__name__, self._name, status)
return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
status, self._daemonic and ' daemon' or '')

@@ -282,4 +292,4 @@ ##

_current_process = self
info('process starting up')
try:
info('child process calling self.run()')
self.run()

@@ -371,3 +381,3 @@ exitcode = 0

'''
log(5, 'dummy reset all proxies')
subdebug('dummy reset all proxies')

@@ -523,58 +533,15 @@ #

#
# Support for finalization of objects using weakrefs
# Make instance and class methods picklable
#
class Finalize(object):
'''
Register a callback to be run once before 'obj' is garbage collected.
If 'obj' is not collected by time of process termination then the
callback will be run if 'atexit' parameter of constructor was true.
'''
_registry = {}
_getindex = itertools.count().next
def __init__(self, obj, callback, args=(), atexit=False):
self._weakref = weakref.ref(obj, self)
self._callback = callback
self._args = args
self._atexit = atexit
self._index = Finalize._getindex()
Finalize._registry[self] = None
def __call__(self, ignore=None):
'''
Run the callback if it has not already been run
'''
log(5, 'calling %s from finalizer', self._callback)
try:
del Finalize._registry[self]
except KeyError:
pass
else:
self._callback(*self._args)
self._weakref = self._callback = self._args = None
_MethodType = type(Process.start)
def __repr__(self):
return '<Finalizer(callback=%r, args=%r, atexit=%r)>' % \
(self._callback, self._args, self._atexit)
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
# staticmethod
def _run_all_finalizers():
'''
Run remaining callbacks (in reverse order of registration)
'''
L = Finalize._registry.keys()
L.sort(cmp=lambda x,y: -cmp(x._index, y._index))
copy_reg.pickle(_MethodType, _reduce_method)
for finalizer in L:
if finalizer._atexit:
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
_run_all_finalizers = staticmethod(_run_all_finalizers)
#

@@ -586,2 +553,6 @@ # Logging

def subdebug(msg, *args):
if _logger:
_logger.subdebug(msg, *args)
def debug(msg, *args):

@@ -595,5 +566,8 @@ if _logger:

def log(level, msg, *args):
def note(msg, *args):
if _logger:
_logger.log(level, msg, *args)
_logger.note(msg, *args)
else:
print >>sys.stderr, ('[NOTE/%s] ' + msg) % \
((currentProcess().getName(),)+args)

@@ -618,3 +592,2 @@ def getLogger():

_logger.propagate = 0
logging.addLevelName(5, 'SUBDEBUG')

@@ -628,2 +601,7 @@ # we want `_logger` to support the "%(processName)s" format

logging.addLevelName(5, 'SUBDEBUG')
logging.addLevelName(31, 'NOTE')
_logger.subdebug = _MethodType(_logger.log, 5)
_logger.note = _MethodType(_logger.log, 31)
# cleanup func of `processing` should run before that of `logging`

@@ -648,2 +626,71 @@ atexit._exithandlers.remove((_exit_func, (), {}))

#
# Support for finalization of objects using weakrefs
#
class Finalize(object):
'''
Register a callback to be run once before 'obj' is garbage collected.
If 'obj' is not collected by time of process termination then the
callback will be run if 'atexit' parameter of constructor was true.
'''
_registry = {}
_getindex = itertools.count().next
_exiting = False
def __init__(self, obj, callback, args=(), priority=0, atexit=False):
self._weakref = weakref.ref(obj, self)
self._callback = callback
self._args = args
self._priority = priority
self._atexit = atexit
self._orderkey = (priority, Finalize._getindex())
Finalize._registry[self] = None
def __call__(self, ignore=None):
'''
Run the callback if it has not already been run
'''
subdebug('calling %s from finalizer %s', self._callback,self._orderkey)
try:
del Finalize._registry[self]
except KeyError:
subdebug('finalizer %s already called', self._callback)
else:
self._callback(*self._args)
self._weakref = self._callback = self._args = None
def cancel(self):
'''
Cancel the callback
'''
try:
del Finalize._registry[self]
except KeyError:
pass
def __repr__(self):
return '<Finalize(callback=%r, args=%r, atexit=%r)>' % \
(self._callback, self._args, self._atexit)
@staticmethod
def _run_all_finalizers():
'''
Run remaining callbacks (in reverse order of registration)
'''
Finalize._exiting = True
L = Finalize._registry.keys()
L.sort(key=lambda f: f._orderkey, reverse=True)
for finalizer in L:
if finalizer._atexit:
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
_trylater = []
#
# Clean up on exit

@@ -653,29 +700,32 @@ #

def _exit_func():
try:
info('running all "atexit" finalizers')
Finalize._run_all_finalizers()
finally:
for p in activeChildren():
if p._daemonic and p._stoppable:
info('calling `stop()` for daemon %s', p.getName())
p._popen.stop()
info('running all "atexit" finalizers')
Finalize._run_all_finalizers()
deadline = time.time() + 0.1
for p in activeChildren():
if p._daemonic and p._stoppable:
info('calling `stop()` for daemon %s', p.getName())
p._popen.stop()
for p in activeChildren():
if p._daemonic and p._stoppable:
info('calling `join(timeout)` for daemon %s', p.getName())
p.join(deadline - time.time())
for p in activeChildren():
if p._daemonic:
info('calling `terminate()` for daemon %s', p.getName())
p._popen.terminate()
deadline = time.time() + 0.1
for p in activeChildren():
if not p._daemonic:
info('calling `join()` for normal process %s', p.getName())
p.join()
for p in activeChildren():
if p._daemonic and p._stoppable:
info('calling `join(timeout)` for daemon %s', p.getName())
p.join(deadline - time.time())
for p in activeChildren():
if p._daemonic:
info('calling `terminate()` for daemon %s', p.getName())
p._popen.terminate()
for p in activeChildren():
info('calling `join()` for process %s', p.getName())
p.join()
for (func, args) in _trylater:
try:
func(*args)
except:
info('failed to do "trylater" call: %s', (func, args))
atexit.register(_exit_func)

@@ -30,3 +30,6 @@ ===================

The project is hosted at
* http://developer.berlios.de/projects/pyprocessing
The package can be downloaded from

@@ -33,0 +36,0 @@

@@ -17,2 +17,5 @@ #

#
#
#

@@ -23,12 +26,13 @@ if (not hasattr(_processing, 'HandleFromPidHandle') and

try:
fromfd = socket.fromfd
except AttributeError:
def fromfd(fd, family, type, proto=0):
s = socket._socket.socket()
_processing.changefd(s, fd, family, type, proto)
return s
if sys.platform == 'win32':
def fromfd(fd, family, type, proto=0):
assert family == socket.AF_INET
assert type == socket.SOCK_STREAM
assert proto == 0
return _processing.falsesocket(fd)
closefd = _processing.CloseHandle
else:
fromfd = socket.fromfd
closefd = os.close

@@ -49,3 +53,3 @@

def reduce_handle(handle):
process.log(5, 'reducing handle %d', handle)
process.subdebug('reducing handle %d', handle)
return (os.getpid(), handle)

@@ -55,3 +59,3 @@

pid, old_handle = reduced_handle
process.log(5, 'rebuilding handle %d from PID=%d', old_handle, pid)
process.subdebug('rebuilding handle %d from PID=%d', old_handle, pid)
return _processing.HandleFromPidHandle(pid, old_handle)

@@ -76,2 +80,3 @@

#
def reduce_pipe_connection(conn):

@@ -138,3 +143,3 @@ return rebuild_pipe_connection, (reduce_handle(conn.fileno()),)

_fd_cache.add(dup_fd)
process.log(5, 'reducing fd %d', fd)
process.subdebug('reducing fd %d', fd)
return (_fd_listener.address, dup_fd)

@@ -145,3 +150,3 @@

address, fd = reduced_handle
process.log(5, 'rebuilding fd %d', fd)
process.subdebug('rebuilding fd %d', fd)
conn = Client(address, authenticate=True)

@@ -191,13 +196,9 @@ conn.send(fd)

# have to guess family, type, proto
try:
address = s.getsockname()
except AttributeError:
Family, Type, Proto = socket.AF_INET, socket.SOCK_STREAM, 0
address = s.getsockname()
if type(address) is str:
Family = socket.AF_UNIX
else:
if type(address) is str:
Family = socket.AF_UNIX
else:
Family = socket.AF_INET
Type = s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
Proto = 0
Family = socket.AF_INET
Type = s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
Proto = 0
reduced_handle = reduce_handle(s.fileno())

@@ -204,0 +205,0 @@ return rebuild_socket, (reduced_handle, Family, Type, Proto)

@@ -142,9 +142,3 @@ #

if sys.version_info >= (2, 4, 0):
kwds['package_data'] = {'processing': data}
else:
# not sure if it will install on Python 2.3 -- probably not
processing_path = join(dirname(os.__file__), 'site-packages', 'processing')
kwds['data_files'] = [(join(processing_path, dirname(pat)), glob(pat))
for pat in data]
kwds['package_data'] = {'processing': data}

@@ -151,0 +145,0 @@ #

@@ -15,2 +15,8 @@ /*

#define CHECKHANDLE(self) \
if (self->handle == INVALID_HANDLE) { \
PyErr_SetString(PyExc_OSError, "handle is invalid"); \
return NULL; \
}
typedef struct {

@@ -35,7 +41,4 @@ PyObject_HEAD

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);
if (!PyArg_ParseTuple(args, "s#", &buffer, &length))

@@ -61,6 +64,3 @@ return NULL;

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);

@@ -92,6 +92,3 @@ Py_BEGIN_ALLOW_THREADS

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);

@@ -133,6 +130,3 @@ if (!PyArg_ParseTuple(args, "w#", &buffer, &length))

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);

@@ -172,6 +166,3 @@ if (!PyArg_ParseTuple(args, "O", &obj))

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);

@@ -205,6 +196,3 @@ Py_BEGIN_ALLOW_THREADS

{
if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);

@@ -220,7 +208,4 @@ return Py_BuildValue("i", self->handle);

if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
CHECKHANDLE(self);
if (! PyArg_ParseTuple(args, "|d", &timeout))

@@ -246,3 +231,5 @@ return NULL;

if (self->handle != INVALID_HANDLE) {
Py_BEGIN_ALLOW_THREADS
_close(self->handle);
Py_END_ALLOW_THREADS
self->handle = INVALID_HANDLE;

@@ -360,261 +347,2 @@ }

#ifdef FALSE_SOCKET
/*
* To support sharing of socket objects we need to be able to build a
* socket object from a handle/fd. Unfortunately Windows lacks the
* function `socket.fromfd()` so we define a false socket type.
*/
PyTypeObject FalseSocketType;
static PyObject *
socket_send(Connection* self, PyObject *args)
{
char *buffer = NULL;
int length, nbytes;
if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
if (!PyArg_ParseTuple(args, "s#", &buffer, &length))
return NULL;
Py_BEGIN_ALLOW_THREADS
nbytes = _write(self->handle, buffer, length);
Py_END_ALLOW_THREADS
if (nbytes < 0)
return SetExcFromNumber(nbytes);
return Py_BuildValue("i", nbytes);
}
static PyObject *
socket_recv(Connection* self, PyObject *args)
{
char *buffer = NULL;
int nbytes;
size_t length;
PyObject *result;
if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
if (!PyArg_ParseTuple(args, "I", &length))
return NULL;
if (length == 0) {
PyErr_SetString(PyExc_AssertionError, "length should be > 0");
return NULL;
}
buffer = malloc(length);
if (buffer == NULL)
return PyErr_NoMemory();
Py_BEGIN_ALLOW_THREADS
nbytes = _read(self->handle, buffer, length);
Py_END_ALLOW_THREADS
if (nbytes < 0) {
free(buffer);
return SetExcFromNumber(nbytes);
}
result = Py_BuildValue("s#", buffer, nbytes);
free(buffer);
return result;
}
static PyObject *
socket_sendall(Connection* self, PyObject *args)
{
char *buffer = NULL;
int length, res;
if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
if (!PyArg_ParseTuple(args, "s#", &buffer, &length))
return NULL;
Py_BEGIN_ALLOW_THREADS
res = _sendall(self->handle, buffer, length);
Py_END_ALLOW_THREADS
if (res < 0)
return SetExcFromNumber(res);
Py_RETURN_NONE;
}
static PyObject *
socket_accept(Connection* self)
{
SOCKET handle = INVALID_SOCKET;
struct sockaddr_in addr;
int addrlen;
PyObject *sock = NULL, *args = NULL;
Py_BEGIN_ALLOW_THREADS
handle = accept(self->handle, (struct sockaddr *)&addr, &addrlen);
Py_END_ALLOW_THREADS
if (handle == INVALID_SOCKET) {
SetExcFromNumber(SOME_SOCKET_ERROR);
goto ERR;
}
args = Py_BuildValue("(i)", handle);
if (!(sock = Connection_new(&FalseSocketType, args)))
goto ERR;
closesocket(handle);
Py_DECREF(args);
return Py_BuildValue("(O(si))", sock, inet_ntoa(addr.sin_addr),
ntohs(addr.sin_port));
ERR:
if (handle != INVALID_SOCKET)
closesocket(handle);
Py_XDECREF(sock);
Py_XDECREF(args);
return NULL;
}
static PyObject *
socket_shutdown(Connection* self, PyObject *args)
{
int how, res;
if (self->handle == INVALID_HANDLE) {
PyErr_SetString(PyExc_AssertionError, "handle is invalid");
return NULL;
}
if (!PyArg_ParseTuple(args, "i", &how))
return NULL;
Py_BEGIN_ALLOW_THREADS
res = shutdown(self->handle, how);
Py_END_ALLOW_THREADS
if (res != 0)
return SetExcFromNumber(SOME_SOCKET_ERROR);
Py_RETURN_NONE;
}
static PyObject *
socket_close(Connection* self)
{
if (self->handle != INVALID_HANDLE) {
closesocket(self->handle);
self->handle = INVALID_HANDLE;
}
Py_RETURN_NONE;
}
static void
socket_dealloc(Connection* self)
{
Py_XDECREF(socket_close(self));
self->ob_type->tp_free((PyObject*)self);
}
static PyObject *
socket_notimplemented(Connection* self, PyObject *args)
{
PyErr_SetString(PyExc_NotImplementedError, "not a real socket");
return NULL;
}
static PyMethodDef socket_methods[] = {
{"close", (PyCFunction)socket_close, METH_NOARGS,
"close the connection"},
{"fileno", (PyCFunction)Connection_fileno, METH_NOARGS,
"file descriptor or handle of the connection"},
{"recv", (PyCFunction)socket_recv, METH_VARARGS,
"receive"},
{"send", (PyCFunction)socket_send, METH_VARARGS,
"send"},
{"sendall", (PyCFunction)socket_sendall, METH_VARARGS,
"sendall"},
{"accept", (PyCFunction)socket_accept, METH_NOARGS,
"accept"},
{"shutdown", (PyCFunction)socket_shutdown, METH_VARARGS,
"shutdown"},
{"recv_into", (PyCFunction)socket_notimplemented, METH_VARARGS,
"not implemented"},
{"sendto", (PyCFunction)socket_notimplemented, METH_VARARGS,
"not implemented"},
{"recvfrom", (PyCFunction)socket_notimplemented, METH_VARARGS,
"not implemented"},
{"recvfrom_into", (PyCFunction)socket_notimplemented, METH_VARARGS,
"not implemented"},
{NULL} /* Sentinel */
};
PyTypeObject FalseSocketType = {
PyObject_HEAD_INIT(NULL)
0, /*ob_size*/
"_processing.falsesocket", /*tp_name*/
sizeof(Connection), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)socket_dealloc,
/*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash*/
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
/*tp_flags*/
"Pretend socket type used because Windows lacks socket.fromfd().\n"
"The constructor takes an fd/handle as its argument.\n"
"The instance uses a duplicated copy of the fd/handle.",
/*tp_doc*/
0, /*tp_traverse*/
0, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
0, /*tp_iter*/
0, /*tp_iternext*/
socket_methods, /*tp_methods*/
0, /*tp_members*/
0, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
0, /*tp_init*/
0, /*tp_alloc*/
(newfunc)Connection_new, /*tp_new*/
};
#endif /* FALSE_SOCKET */
#endif /* _CONNECTION_H */

@@ -230,3 +230,4 @@ #include "Python.h"

if (length > self->msgsize) {
PyErr_SetString(PyExc_ValueError, "pickled string too long");
PyErr_SetString(PyExc_ValueError, "pickled string too long for a posix"
" queue - try a pipe queue instead");
goto ERR;

@@ -233,0 +234,0 @@ }

@@ -27,1 +27,47 @@ /*

#ifdef MS_WINDOWS
typedef struct {
PyObject_HEAD
SOCKET sock_fd;
int sock_family;
int sock_type;
int sock_proto;
PyObject *(*errorhandler)(void);
double sock_timeout;
} PySocketSockObject;
extern PyTypeObject *socketType;
PyObject *
socket_changefd(PyObject *self, PyObject *args)
{
PySocketSockObject *s;
int family, type, proto=0;
SOCKET fd, newfd;
/* should probably use "n" format for fd on Python 2.5 */
if (!PyArg_ParseTuple(args, "Oiii|i", &s, &fd, &family, &type, &proto))
return NULL;
newfd = _duplicate(fd);
if (newfd == INVALID_SOCKET) {
PyErr_SetString(PyExc_OSError, "failed to duplicate socket handle");
return NULL;
}
if (s->sock_fd != INVALID_SOCKET) {
Py_BEGIN_ALLOW_THREADS
closesocket(s->sock_fd);
Py_END_ALLOW_THREADS
}
s->sock_fd = newfd;
s->sock_family = family;
s->sock_type = type;
s->sock_proto = proto;
Py_RETURN_NONE;
}
#endif

@@ -23,6 +23,7 @@ /*

extern PyTypeObject SocketConnectionType;
extern PyTypeObject FalseSocketType;
PyObject *dumpsFunction, *loadsFunction, *BufferTooShort;
extern PyObject *socket_changefd(PyObject *self, PyObject *args);
PyObject *dumpsFunction, *loadsFunction, *BufferTooShort, *socketType;
/*

@@ -260,2 +261,4 @@ * Definitions to create and manipulate pipe handles

{"changefd", (PyCFunction)socket_changefd, METH_VARARGS, ""},
{NULL} /* Sentinel */

@@ -285,2 +288,12 @@ };

Py_XDECREF(other_module);
/*
* Get copy of `_socket.socket`
*/
other_module = PyImport_ImportModule("_socket");
if (!other_module)
return;
socketType = PyObject_GetAttrString(other_module, "socket");
Py_XDECREF(other_module);

@@ -310,7 +323,2 @@ /*

if (PyType_Ready(&FalseSocketType) < 0)
return;
Py_INCREF(&FalseSocketType);
PyModule_AddObject(m, "falsesocket", (PyObject*)&FalseSocketType);
if (PyType_Ready(&BlockerType) < 0)

@@ -317,0 +325,0 @@ return;

#
# Module implementing synchronization primitives and queues.
# Module implementing synchronization primitives
#

@@ -8,7 +8,5 @@ # processing/synchronize.py

#
# The C extension `_processing` will be used if available.
#
__all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'PipeQueue', 'BufferedPipeQueue' ]
'Event']

@@ -18,8 +16,6 @@ import threading

import sys
import process
import _processing
import collections
import itertools
from Queue import Full, Empty
from processing import process, _processing
from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize

@@ -49,22 +45,33 @@ from time import time as _time, sleep as _sleep

def __init__(self, kind, value, name):
def __init__(self, kind, value):
counter = _nextid()
name = '/pys-%s-%s' % (os.getpid(), counter)
self._block = _processing.Blocker(
name=name, create=True, kind=kind, value=value
)
process.debug('creating blocker with name %r' % name)
if name is None:
name = '/pys-%s-%s' % (os.getpid(), counter)
self._block = _processing.Blocker(
name=name, create=True, kind=kind, value=value
)
process.debug('Creating blocker with name %r' % name)
if sys.platform != 'win32':
# On Unix we immediately unlink the name of the
# semaphore since otherwise the semaphore might not
# get removed (till the next reboot) if python gets
# killed. This means that `Blocker` objects are not
# picklable on Unix, but that does not prevent a child
# process from using a `Blocker` object inherited from
# its parent.
self._block._unlink()
if sys.platform != 'win32':
# On Unix we immediately unlink the name of the
# semaphore since otherwise the semaphore might not
# get removed (till the next reboot) if python gets
# killed. This means that `Blocker` objects are not
# picklable on Unix, but that does not prevent a child
# process from using a `Blocker` object inherited from
# its parent.
self._block._unlink()
else:
state = (kind, value, name)
self.__setstate__(state)
def __getstate__(self):
if sys.platform != 'win32':
raise NotImplemented
return self._state
def __setstate__(self, state):
(kind, value, name) = self._state = state
self._initvalue = value
if not hasattr(self, '_block'):
process.debug('opening blocker with name %r' % name)
self._block = _processing.Blocker(

@@ -74,9 +81,2 @@ name=name, create=False, kind=kind, value=value

self._name = name
if kind == BOUNDED_SEMAPHORE:
self._maxvalue = value
else:
self._maxvalue = -1
self._kind = kind
if kind in (MUTEX, RECURSIVE_MUTEX) and sys.platform != 'win32':

@@ -99,5 +99,2 @@ # On Unix a semaphore masquerading as a mutex will not be

def __reduce__(self):
raise NotImplementedError
#

@@ -109,4 +106,4 @@ # Semaphore

def __init__(self, value=1, _name=None):
Blocker.__init__(self, SEMAPHORE, value, _name)
def __init__(self, value=1):
Blocker.__init__(self, SEMAPHORE, value)

@@ -124,7 +121,2 @@ def getValue(self):

if sys.platform == 'win32':
def __reduce__(self):
return (type(self), (-1, self._name))
#

@@ -136,4 +128,4 @@ # Bounded semaphore

def __init__(self, value=1, _name=None):
Blocker.__init__(self, BOUNDED_SEMAPHORE, value, _name)
def __init__(self, value=1):
Blocker.__init__(self, BOUNDED_SEMAPHORE, value)

@@ -143,3 +135,3 @@ def __repr__(self):

return '<BoundedSemaphore(value=%r, maxvalue=%r)>' % \
(self._block._getvalue(), self._maxvalue)
(self._block._getvalue(), self._initvalue)
except (KeyboardInterrupt, SystemExit):

@@ -150,7 +142,2 @@ raise

if sys.platform == 'win32':
def __reduce__(self):
return (type(self), (self._maxvalue, self._name))
#

@@ -162,4 +149,4 @@ # Non-recursive lock -- releasing an unowned lock raises AssertionError

def __init__(self, _name=None):
Blocker.__init__(self, MUTEX, 1, _name)
def __init__(self):
Blocker.__init__(self, MUTEX, 1)

@@ -174,7 +161,2 @@ def __repr__(self):

if sys.platform == 'win32':
def __reduce__(self):
return (type(self), (self._name,))
#

@@ -187,3 +169,3 @@ # Recursive lock

def __init__(self, _name=None):
Blocker.__init__(self, RECURSIVE_MUTEX, 1, _name)
Blocker.__init__(self, RECURSIVE_MUTEX, 1)

@@ -209,7 +191,2 @@ def __repr__(self):

if sys.platform == 'win32':
def __reduce__(self):
return (type(self), (self._name,))
#

@@ -221,10 +198,15 @@ # Condition variable

def __init__(self, lock=None, _extra=None):
self._lock = lock or RLock()
if _extra is None:
_extra = (Semaphore(0), Semaphore(0), Semaphore(0))
self._sleeping_count, self._woken_count, self._wait_semaphore = _extra
def __init__(self, lock=None):
state = (lock or RLock(), Semaphore(0), Semaphore(0), Semaphore(0))
self.__setstate__(state)
def __setstate__(self, state):
(self._lock, self._sleeping_count,
self._woken_count, self._wait_semaphore) = self._state = state
self.acquire = self._lock.acquire
self.release = self._lock.release
def __getstate__(self):
return self._state
def __repr__(self):

@@ -293,8 +275,2 @@ try:

if sys.platform == 'win32':
def __reduce__(self):
return Condition, (self._lock, (self._sleeping_count,
self._woken_count, self._wait_semaphore))
#

@@ -338,298 +314,1 @@ # Event

#
# A Queue bases on a pipe
#
class PipeQueue(object):
def __init__(self, maxsize=0, _extra=None):
self._maxsize = maxsize
if _extra is None:
if sys.platform == 'win32':
from processing.connection import Listener, Client
l = Listener()
reader = Client(l.address)
writer = l.accept()
else:
rfd, wfd = os.pipe()
reader = _processing.SocketConnection(rfd)
writer = _processing.SocketConnection(wfd)
rlock = RLock()
wlock = RLock()
if maxsize <= 0:
wsem = None
else:
wsem = BoundedSemaphore(maxsize)
self._extra = reader, writer, rlock, wlock, wsem
else:
self._extra = reader, writer, rlock, wlock, wsem = _extra
self._wsem = wsem
self._read_acquire = rlock.acquire
self._read_acquire_timeout = rlock._block.acquire_timeout
self._read_release = rlock.release
self._write_acquire = wlock.acquire
self._write_acquire_timeout = wlock._block.acquire_timeout
self._write_release = wlock.release
self._put = writer.send
self._get = reader.recv
self._poll = reader.poll
def get(self, block=1, timeout=None):
if block == 1 and timeout is None:
self._read_acquire()
try:
res = self._get()
if self._wsem:
self._wsem.release()
return res
finally:
self._read_release()
else:
if block == 0:
timeout = 0.0
else:
timeout = max(0.0, timeout)
deadline = _time() + timeout
if not self._read_acquire_timeout(timeout):
raise Empty
try:
timeout = max(0.0, deadline - _time())
if not self._poll(timeout):
raise Empty
res = self._get()
if self._wsem:
self._wsem.release()
return res
finally:
self._read_release()
def put(self, obj, block=1, timeout=None):
if block == 1 and timeout is None:
self._write_acquire()
try:
if self._wsem:
self._wsem.acquire()
self._put(obj)
finally:
self._write_release()
else:
if block == 0:
timeout = 0.0
else:
timeout = max(0.0, timeout)
deadline = _time() + timeout
if not self._write_acquire_timeout(timeout):
raise Full
try:
if self._wsem:
timeout = max(0.0, deadline - _time())
if not self._wsem._block.acquire_timeout(timeout):
raise Full
self._put(obj)
finally:
self._write_release()
def qsize(self):
raise NotImplementedError
def empty(self):
return not self._poll()
def full(self):
return bool(self._wsem) and self._wsem._block._getvalue() == 0
def get_nowait(self, obj):
return self.get(obj, False)
def put_nowait(self, obj):
return self.put(obj, False)
if sys.platform == 'win32':
def __reduce__(self):
return type(self), (self._maxsize, self._extra)
Queue = PipeQueue
#
# Queue based on a posix message queue
#
if hasattr(_processing, 'Queue'):
class PosixQueue(_processing.Queue):
_count = 0
_count_lock = threading.RLock()
_defaults = None
def __init__(self, maxsize=0, msgsize=0, _name=None):
assert maxsize >= 0 and msgsize >= 0
PosixQueue._count_lock.acquire()
try:
PosixQueue._count += 1
count = PosixQueue._count
finally:
PosixQueue._count_lock.release()
if _name is None:
_name = '/pyq-%s-%s' % (os.getpid(), count)
if (maxsize, msgsize) != (0, 0):
defmaxsize, defmsgsize = self.getdefaults()
maxsize = maxsize or defmaxsize
msgsize = msgsize or defmsgsize
_processing.Queue.__init__(self, maxsize, msgsize, _name, True)
# We immediately unlink the name of the queue since
# otherwise the queue might not get removed (till the next
# reboot) if python gets killed. This means that `Queue`
# objects are not picklable, but that does not prevent a
# child process from using a `Queue` object inherited from
# its parent.
self._unlink()
def get_nowait(self):
return self.get(False)
def put_nowait(self, item):
return self.put(item, False)
# staticmethod
def getdefaults():
PosixQueue._count_lock.acquire()
try:
if PosixQueue._defaults is None:
temp = PosixQueue()
PosixQueue._defaults = (temp._maxmsg, temp._msgsize)
temp._close()
return PosixQueue._defaults
finally:
PosixQueue._count_lock.release()
getdefaults = staticmethod(getdefaults)
Queue = PosixQueue
__all__ += ['PosixQueue', 'BufferedPosixQueue']
#
# Buffered versions of queues
#
_sentinel = object()
class _BufferedQueue(object):
def __init__(self, _queue, **kwds):
self._queue = _queue
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
for attr in ('get', 'empty', 'full', 'qsize',
'put_nowait', 'get_nowait'):
method = getattr(self._queue, attr)
setattr(self, attr, method)
def put(self, obj):
self._buffer.append(obj)
self._notempty.acquire()
try:
if self._thread is None:
self._startthread()
self._notempty.notify()
finally:
self._notempty.release()
def putmany(self, iterable):
self._buffer.extend(iterable)
self._notempty.acquire()
try:
if self._thread is None:
self._startthread()
self._notempty.notify()
finally:
self._notempty.release()
def close(self): # will be overwritten by _startthread()
pass
def _startthread(self):
self._thread = threading.Thread(
target=_BufferedQueue._feed,
args=[self._buffer, self._notempty, self._queue.put]
)
self.close = process.Finalize(
self, _BufferedQueue._finalize_queue,
[self._buffer, self._notempty, self._thread],
atexit=True
)
# We make the thread daemonic only to prevent the exit handler
# of `threading` from trying to join it before the finalizer
# `self.close()` gets a chance to stop and join it.
self._thread.setDaemon(True)
self._thread.start()
# staticmethod
def _finalize_queue(buffer, notempty, thread):
process.debug('closing thread used by buffered queue')
notempty.acquire()
try:
buffer.append(_sentinel)
notempty.notify()
finally:
notempty.release()
thread.join()
_finalize_queue = staticmethod(_finalize_queue)
# staticmethod
def _feed(buffer, notempty, put):
while 1:
notempty.acquire()
try:
if not buffer:
notempty.wait()
finally:
notempty.release()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
return
put(obj)
except IndexError:
pass
_feed = staticmethod(_feed)
class BufferedPipeQueue(_BufferedQueue):
def __init__(self, maxsize=0, _queue=None):
self._queue = _queue or PipeQueue(maxsize)
self._maxsize = maxsize
_BufferedQueue.__init__(self, self._queue)
def __reduce__(self):
return (BufferedPipeQueue, (self._maxsize, self._queue))
if hasattr(_processing, 'Queue'):
class BufferedPosixQueue(_BufferedQueue):
def __init__(self, maxsize=0, msgsize=0):
_BufferedQueue.__init__(self, PosixQueue(maxsize, msgsize))

@@ -24,3 +24,3 @@ #

def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (currentProcess()._name, format % args))
sys.stderr.write('[%s]\t%s\n' % (currentProcess().getName(), format%args))

@@ -27,0 +27,0 @@

@@ -55,10 +55,2 @@ #

class A(object):
# staticmethod
def f():
return 'A.f()'
f = staticmethod(f)
##
def test():

@@ -65,0 +57,0 @@ manager = MyManager()

@@ -94,9 +94,16 @@ #

A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\t%s seconds' % (N, time.time() - t)
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\t%s seconds' % (N, time.time() - t)
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=10000))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=10000)):\n\t\t%s' \
' seconds' % (N, time.time() - t)
assert A == B, (len(A), len(B))
assert A == B == C, (len(A), len(B), len(C))
print

@@ -110,12 +117,19 @@

A = map(noop, L)
print '\tmap(noop, L):\t\t\t%s seconds' % (time.time() - t)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\t\t%s seconds' % (time.time() - t)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
assert A == B, (len(A), len(B))
t = time.time()
C = list(pool.imap(noop, L, chunksize=10000))
print '\tlist(pool.imap(noop, L, chunksize=10000)):\n\t\t%s seconds' % \
(time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, L
del A, B, C, L

@@ -211,5 +225,5 @@ #

if A == B:
print '\tcallbacks succeeded'
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s' % (A, B)
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

@@ -223,18 +237,63 @@ #

#
# Check that processes get stopped when pool is deleted
# Check close() methods
#
processes = pool._pool
print 'Testing close():'
for worker in processes:
for worker in pool._pool:
assert worker.isAlive()
del pool
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.isAlive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = Pool(2)
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [10]) for i in range(10)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.isAlive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = Pool(2)
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [10]) for i in range(10)]
del results, pool
time.sleep(0.2)
for worker in processes:
assert not worker.isAlive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
freezeSupport()
test()

@@ -187,12 +187,8 @@ #

threading.Condition())
if hasattr(processing, 'PosixQueue'):
print '\n\t######## testing processing.PosixQueue\n'
test_queuespeed(processing.Process, processing.PosixQueue(),
processing.Condition())
print '\n\t######## testing processing.BufferedPosixQueue\n'
test_queuespeed(processing.Process, processing.BufferedPosixQueue(),
processing.Condition())
print '\n\t######## testing processing.PipeQueue\n'
test_queuespeed(processing.Process, processing.PipeQueue(),
print '\n\t######## testing processing.Queue\n'
test_queuespeed(processing.Process, processing.Queue(),
processing.Condition())
print '\n\t######## testing processing.SimpleQueue\n'
test_queuespeed(processing.Process, processing.SimpleQueue(),
processing.Condition())
print '\n\t######## testing Queue managed by server process\n'

@@ -203,5 +199,6 @@ test_queuespeed(processing.Process, manager.Queue(),

test_pipespeed()
print '\n\t######## testing processing.BufferedPipeQueue\n'
test_queuespeed(processing.Process, processing.BufferedPipeQueue(),
processing.Condition())
if hasattr(processing, 'PosixQueue'):
print '\n\t######## testing processing.PosixQueue\n'
test_queuespeed(processing.Process, processing.PosixQueue(),
processing.Condition())

@@ -208,0 +205,0 @@ print

@@ -6,3 +6,3 @@ import time, sys, processing

for i in range(50):
for i in range(25):
try:

@@ -54,3 +54,3 @@ time.sleep(0.1)

p.start()
time.sleep(3)
time.sleep(1.5)
p.stop()

@@ -63,3 +63,3 @@ p.join()

p.start()
time.sleep(3)
time.sleep(1.5)
print >>sys.stderr, '\nterminating process'

@@ -66,0 +66,0 @@ p.terminate()

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Note that a queue produced by `processing.Queue()` will have finite
# capacity meaning that its `put()` method can block. Therefore we
# use a queue using `processing.BufferedQueue()`. This has the
# guarantee that the `put()` (or `putmany()`) method will succeed
# without blocking. Therefore putting all the tasks on this queue
# will not cause a deadlock, regardless of how many tasks there are,
# or whether there is another process which is already consuming the
# tasks.
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Also notice that the results will probably not come out of the
# output queue in the same in the same order as the corresponding
# tasks were put on the input queue. If it is important to get the
# results back in the original order then consider using `Pool.map()`
# or `Pool.imap()`.
#

@@ -67,8 +58,7 @@ import time

# Create queues
task_queue = BufferedQueue()
task_queue = Queue()
done_queue = Queue()
# Submit tasks -- no deadlock possible since `task_queue` is buffered
for task in TASKS1:
task_queue.put(task)
# Submit tasks
task_queue.putmany(TASKS1)

@@ -84,4 +74,5 @@ # Start worker processes

# Add more tasks -- `putmany()` is an alternative to `put()` and a for-loop
task_queue.putmany(TASKS2)
# Add more tasks using `put()` instead of `putmany()`
for task in TASKS2:
task_queue.put(task)

@@ -88,0 +79,0 @@ # Get and print some more results

@@ -8,5 +8,5 @@ ========

Alexey Akimov, Michele Bertoldi, Josiah Carlson, Lisandro Dalcin,
Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Kevin
Manley, Dominique Wahli.
Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Gerald
John M. Manipon, Kevin Manley, Paul Rudin, Dominique Wahli.
Sorry if I have forgotten anyone.