processing
Advanced tools
+358
| # | ||
| # Module implementing queues | ||
| # | ||
| # processing/queue.py | ||
| # | ||
| # Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt | ||
| # | ||
| __all__ = ['Queue', 'SimpleQueue'] | ||
| import sys | ||
| import os | ||
| import threading | ||
| import collections | ||
| import time | ||
| import atexit | ||
| import weakref | ||
| from processing import _processing, process, Pipe | ||
| from processing.synchronize import Lock, BoundedSemaphore | ||
| from processing.process import debug, Finalize | ||
| from Queue import Empty, Full | ||
| # | ||
| # Ensure cleanup func of `processing` runs before that of `threading` | ||
| # | ||
| atexit._exithandlers.remove((process._exit_func, (), {})) | ||
| atexit._exithandlers.append((process._exit_func, (), {})) | ||
| # | ||
| # Queue type based on a pipe -- uses a buffer and a thread | ||
| # | ||
| class Queue(object): | ||
| def __init__(self, maxsize=0): | ||
| if sys.platform == 'win32': | ||
| from processing.connection import Listener, Client | ||
| l = Listener() | ||
| reader = Client(l.address) | ||
| writer = l.accept() | ||
| else: | ||
| rfd, wfd = os.pipe() | ||
| reader = _processing.SocketConnection(rfd) | ||
| writer = _processing.SocketConnection(wfd) | ||
| os.close(rfd), os.close(wfd) | ||
| rlock = Lock() | ||
| wlock = Lock() | ||
| if maxsize == 0: | ||
| sem = None | ||
| else: | ||
| sem = BoundedSemaphore(maxsize) | ||
| state = maxsize, reader, writer, rlock, wlock, sem, os.getpid() | ||
| self.__setstate__(state) | ||
| def __setstate__(self, state): | ||
| (self._maxsize, self._reader, self._writer, | ||
| self._rlock, self._wlock, self._sem, self._opid) = state | ||
| self._closed = False | ||
| self._close = None | ||
| self._jointhread = None | ||
| self._send = self._writer.send | ||
| self._recv = self._reader.recv | ||
| self._poll = self._reader.poll | ||
| self._notempty = threading.Condition(threading.Lock()) | ||
| self._buffer = collections.deque() | ||
| self._thread = None | ||
| def __getstate__(self): | ||
| return (self._maxsize, self._reader, self._writer, | ||
| self._rlock, self._wlock, self._sem, self._opid) | ||
| def put(self, obj, block=True, timeout=None): | ||
| assert not self._closed | ||
| if self._maxsize: | ||
| if block and timeout is None: | ||
| if self._sem: | ||
| self._sem.acquire() | ||
| else: | ||
| if not block: | ||
| timeout = 0.0 | ||
| else: | ||
| timeout = max(0.0, timeout) | ||
| if not self._sem._block.acquire_timeout(timeout): | ||
| raise Full | ||
| self._notempty.acquire() | ||
| try: | ||
| if self._thread is None: | ||
| self._startthread() | ||
| self._buffer.append(obj) | ||
| self._notempty.notify() | ||
| finally: | ||
| self._notempty.release() | ||
| def putmany(self, iterable): | ||
| assert not self._closed | ||
| assert self._maxsize == 0 | ||
| self._notempty.acquire() | ||
| try: | ||
| if self._thread is None: | ||
| self._startthread() | ||
| self._buffer.extend(iterable) | ||
| self._notempty.notify() | ||
| finally: | ||
| self._notempty.release() | ||
| def get(self, block=1, timeout=None): | ||
| if block and timeout is None: | ||
| self._rlock.acquire() | ||
| try: | ||
| res = self._recv() | ||
| if self._sem: | ||
| self._sem.release() | ||
| return res | ||
| finally: | ||
| self._rlock.release() | ||
| else: | ||
| if not block: | ||
| timeout = 0.0 | ||
| else: | ||
| timeout = max(0.0, timeout) | ||
| deadline = time.time() + timeout | ||
| if not self._rlock._block.acquire_timeout(timeout): | ||
| raise Empty | ||
| try: | ||
| timeout = max(0.0, deadline - time.time()) | ||
| if not self._poll(timeout): | ||
| raise Empty | ||
| res = self._recv() | ||
| if self._sem: | ||
| self._sem.release() | ||
| return res | ||
| finally: | ||
| self._rlock.release() | ||
| def qsize(self): | ||
| raise NotImplementedError | ||
| def empty(self): | ||
| return not self._poll() | ||
| def full(self): | ||
| return bool(self._wsem) and self._wsem._block._getvalue() == 0 | ||
| def get_nowait(self, obj): | ||
| return self.get(obj, False) | ||
| def put_nowait(self, obj): | ||
| return self.put(obj, False) | ||
| def close(self): | ||
| if self._close: | ||
| self._close() | ||
| self._closed = True | ||
| def jointhread(self): | ||
| assert self._closed | ||
| if self._jointhread: | ||
| self._jointhread() | ||
| def canceljoin(self): | ||
| self._jointhread.cancel() | ||
| def _startthread(self): | ||
| # Start thread which transfers data from buffer to pipe | ||
| self._thread = threading.Thread( | ||
| target=Queue._feed, | ||
| args=[self._buffer, self._notempty, self._send, self._wlock] | ||
| ) | ||
| self._thread.setDaemon(True) | ||
| self._thread.start() | ||
| # Send sentinel to the thread queue object is garbage collected | ||
| self._close = Finalize( | ||
| self, Queue._finalize_close, | ||
| [self._buffer, self._notempty], | ||
| priority=10, atexit=True | ||
| ) | ||
| # Flush data to pipe | ||
| self._jointhread = Finalize( | ||
| self._thread, Queue._finalize_join, | ||
| [weakref.ref(self._thread)], | ||
| atexit=True | ||
| ) | ||
| # Don't bother joining on exit if this process created the queue | ||
| if self._opid == os.getpid(): | ||
| self._jointhread.cancel() | ||
| @staticmethod | ||
| def _finalize_join(twr): | ||
| debug('joining queue thread') | ||
| thread = twr() | ||
| if thread is not None: | ||
| thread.join() | ||
| debug('... queue thread joined') | ||
| else: | ||
| debug('... queue thread already dead') | ||
| @staticmethod | ||
| def _finalize_close(buffer, notempty): | ||
| debug('telling thread used by a buffered queue to quit') | ||
| notempty.acquire() | ||
| try: | ||
| buffer.append(_sentinel) | ||
| notempty.notify() | ||
| finally: | ||
| notempty.release() | ||
| @staticmethod | ||
| def _feed(buffer, notempty, send, writelock): | ||
| debug('starting thread to feed data to pipe') | ||
| while 1: | ||
| notempty.acquire() | ||
| try: | ||
| if not buffer: | ||
| notempty.wait() | ||
| finally: | ||
| notempty.release() | ||
| try: | ||
| while 1: | ||
| obj = buffer.popleft() | ||
| if obj is _sentinel: | ||
| debug('feeder thread got sentinel -- exiting') | ||
| return | ||
| writelock.acquire() | ||
| try: | ||
| send(obj) | ||
| finally: | ||
| writelock.release() | ||
| except IndexError: | ||
| pass | ||
| _sentinel = object() | ||
| # | ||
| # Simplified Queue type -- really just a locked pipe | ||
| # | ||
| class SimpleQueue(object): | ||
| def __init__(self): | ||
| if sys.platform == 'win32': | ||
| from processing.connection import Listener, Client | ||
| l = Listener() | ||
| reader = Client(l.address) | ||
| writer = l.accept() | ||
| state = reader, writer, Lock(), None | ||
| else: | ||
| rfd, wfd = os.pipe() | ||
| reader = _processing.SocketConnection(rfd) | ||
| writer = _processing.SocketConnection(wfd) | ||
| os.close(rfd), os.close(wfd) | ||
| state = reader, writer, Lock(), Lock() | ||
| self.__setstate__(state) | ||
| def empty(self): | ||
| return not self._reader.poll() | ||
| def __setstate__(self, state): | ||
| self._reader, self._writer, self._rlock, self._wlock = state | ||
| recv = self._reader.recv | ||
| racquire, rrelease = self._rlock.acquire, self._rlock.release | ||
| def get(): | ||
| racquire() | ||
| try: | ||
| return recv() | ||
| finally: | ||
| rrelease() | ||
| self.get = get | ||
| if self._wlock is None: | ||
| # writes to a message oriented win32 pipe are atomic | ||
| self.put = self._writer.send | ||
| else: | ||
| send = self._writer.send | ||
| wacquire, wrelease = self._wlock.acquire, self._wlock.release | ||
| def put(obj): | ||
| wacquire() | ||
| try: | ||
| return send(obj) | ||
| finally: | ||
| wrelease() | ||
| self.put = put | ||
| def __getstate__(self): | ||
| # state will only be picklable on windows | ||
| return self._reader, self._writer, self._rlock, self._wlock | ||
| # | ||
| # Queue type based on a posix queue | ||
| # | ||
| if hasattr(_processing, 'Queue'): | ||
| class PosixQueue(_processing.Queue): | ||
| _count = 0 | ||
| _count_lock = threading.RLock() | ||
| _defaults = None | ||
| def __init__(self, maxsize=0, msgsize=0): | ||
| assert maxsize >= 0 and msgsize >= 0 | ||
| PosixQueue._count_lock.acquire() | ||
| try: | ||
| PosixQueue._count += 1 | ||
| count = PosixQueue._count | ||
| finally: | ||
| PosixQueue._count_lock.release() | ||
| _name = '/pyq-%s-%s' % (os.getpid(), count) | ||
| if (maxsize, msgsize) != (0, 0): | ||
| defmaxsize, defmsgsize = self.getdefaults() | ||
| maxsize = maxsize or defmaxsize | ||
| msgsize = msgsize or defmsgsize | ||
| _processing.Queue.__init__(self, maxsize, msgsize, _name, True) | ||
| # We immediately unlink the name of the queue since | ||
| # otherwise the queue might not get removed (till the next | ||
| # reboot) if python gets killed. This means that `Queue` | ||
| # objects are not picklable, but that does not prevent a | ||
| # child process from using a `Queue` object inherited from | ||
| # its parent. | ||
| self._unlink() | ||
| def get_nowait(self): | ||
| return self.get(False) | ||
| def put_nowait(self, item): | ||
| return self.put(item, False) | ||
| @staticmethod | ||
| def getdefaults(): | ||
| PosixQueue._count_lock.acquire() | ||
| try: | ||
| if PosixQueue._defaults is None: | ||
| temp = PosixQueue() | ||
| PosixQueue._defaults = (temp._maxmsg, temp._msgsize) | ||
| temp._close() | ||
| return PosixQueue._defaults | ||
| finally: | ||
| PosixQueue._count_lock.release() | ||
| __all__ += ['PosixQueue'] |
+14
-26
@@ -39,3 +39,3 @@ # | ||
| __version__ = '0.37' | ||
| __version__ = '0.38' | ||
@@ -85,3 +85,3 @@ __all__ = [ | ||
| from process import freezeSupport, ProcessExit, Finalize | ||
| from process import getLogger, enableLogging | ||
| from process import getLogger, enableLogging, note | ||
@@ -153,5 +153,4 @@ # | ||
| __all__ += [ | ||
| 'LocalManager', 'Lock', 'RLock', 'Semaphore', | ||
| 'BoundedSemaphore', 'Condition', 'Event', | ||
| 'PipeQueue', 'BufferedPipeQueue', 'Queue', 'BufferedQueue' | ||
| 'LocalManager', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', | ||
| 'Condition', 'Event', 'Queue', 'SimpleQueue' | ||
| ] | ||
@@ -208,18 +207,17 @@ | ||
| def PipeQueue(maxsize=0): | ||
| def Queue(maxsize=0): | ||
| ''' | ||
| Returns a queue object implemented using a pipe | ||
| ''' | ||
| from processing.synchronize import PipeQueue | ||
| return PipeQueue(maxsize) | ||
| from processing.queue import Queue | ||
| return Queue(maxsize) | ||
| def BufferedPipeQueue(maxsize=0): | ||
| def SimpleQueue(): | ||
| ''' | ||
| Returns a queue for which `put()` always succeeds without blocking | ||
| Returns a simplified queue object implemented using a pipe | ||
| ''' | ||
| from processing.synchronize import BufferedPipeQueue | ||
| return BufferedPipeQueue(maxsize) | ||
| from processing.queue import SimpleQueue | ||
| return SimpleQueue() | ||
| Queue = PipeQueue | ||
| BufferedQueue = BufferedPipeQueue | ||
| PipeQueue = BufferedPipeQueue = BufferedQueue = Queue # deprecrated names | ||
@@ -229,3 +227,3 @@ | ||
| __all__ += ['PosixQueue', 'BufferedPosixQueue'] | ||
| __all__ += ['PosixQueue'] | ||
@@ -236,13 +234,3 @@ def PosixQueue(maxsize=0, msgsize=0): | ||
| ''' | ||
| from processing.synchronize import PosixQueue | ||
| from processing.queue import PosixQueue | ||
| return PosixQueue(maxsize, msgsize) | ||
| def BufferedPosixQueue(maxsize=0, msgsize=0): | ||
| ''' | ||
| Returns a queue for which `put()` always succeeds without blocking | ||
| ''' | ||
| from processing.synchronize import BufferedPosixQueue | ||
| return BufferedPosixQueue(maxsize) | ||
| Queue = PosixQueue | ||
| BufferedQueue = BufferedPosixQueue |
+59
-0
@@ -5,2 +5,61 @@ ======================================== | ||
| Changes in 0.38 | ||
| --------------- | ||
| * Have revamped the queue types. Now the queue types are | ||
| `Queue`, `SimpleQueue` and (on systems which support it) | ||
| `PosixQueue`. | ||
| Now `Queue` should behave just like Python's normal `Queue.Queue` | ||
| class except that `qsize()`, `task_done()` and `join()` are not | ||
| implemented. In particular, if no maximum size was specified when | ||
| the queue was created then `put()` will always succeed without | ||
| blocking. | ||
| A `SimpleQueue` instance is really just a pipe protected by a couple | ||
| of locks. It has `get()`, `put()` and `empty()` methods but does | ||
| not not support timeouts or non-blocking. | ||
| `BufferedPipeQueue()` and `PipeQueue()` remain as deprecated | ||
| aliases of `Queue()` but `BufferedPosixQueue()` has been removed. | ||
| (Not sure if we really need to keep `PosixQueue()`...) | ||
| * Previously the `Pool.shutdown()` method was a little dodgy -- it | ||
| could block indefinitely if `map()` or `imap*()` were used and did | ||
| not try to terminate workers while they were doing a task. | ||
| Now there are three new methods `close()`, `terminate()` and | ||
| `join()` -- `shutdown()` is retained as a deprecated alias of | ||
| `terminate()`. Thanks to Gerald John M. Manipon for feature | ||
| request/suggested patch to `shutdown()`. | ||
| * `Pool.imap()` and `Pool.imap_unordered()` has gained a `chunksize` | ||
| argument which allows the iterable to be submitted to the pool in | ||
| chunks. Choosing `chunksize` appropriately makes `Pool.imap()` | ||
| almost as fast as `Pool.map()` even for long iterables and cheap | ||
| functions. | ||
| * Previously on Windows when the cleanup code for a `LocalManager` | ||
| attempts to unlink the name of the file which backs the shared | ||
| memory map an exception is raised if a child process still exists | ||
| which has a handle open for that mmap. This is likely to happen if | ||
| a daemon process inherits a `LocalManager` instance. | ||
| Now the parent process will remember the filename and attempt to | ||
| unlink the file name again once all the child processes have been | ||
| joined or terminated. Reported by Paul Rudin. | ||
| * `types.MethodType` is registered with `copy_reg` so now instance | ||
| methods and class methods should be picklable. (Unfortunately there is | ||
| no obvious way of supporting the pickling of staticmethods since | ||
| they are not marked with the class in which they were defined.) | ||
| This means that on Windows it is now possible to use an instance | ||
| method or class method as the target callable of a Process object. | ||
| * On Windows `reduction.fromfd()` now returns true instances of | ||
| `_socket.socket`, so there is no more need for the | ||
| `_processing.falsesocket` type. | ||
| Changes in 0.37 | ||
@@ -7,0 +66,0 @@ --------------- |
+7
-9
@@ -19,3 +19,3 @@ # | ||
| from cPickle import dumps, loads | ||
| from processing.process import Finalize, currentProcess, log | ||
| from processing.process import Finalize, currentProcess, subdebug | ||
@@ -301,3 +301,3 @@ # | ||
| log(5, 'listener has bound address %r', self._address) | ||
| subdebug('listener has bound address %r', self._address) | ||
@@ -316,5 +316,5 @@ self.close = Finalize( | ||
| # staticmethod | ||
| @staticmethod | ||
| def _finalize_socketlistener(address, family, sock): | ||
| log(5, 'closing listener with address=%r', address) | ||
| subdebug('closing listener with address=%r', address) | ||
| sock.close() | ||
@@ -326,3 +326,2 @@ if family == 'AF_UNIX': | ||
| pass | ||
| _finalize_socketlistener = staticmethod(_finalize_socketlistener) | ||
@@ -374,3 +373,3 @@ def SocketClient(address): | ||
| log(5, 'listener created with address=%r', self._address) | ||
| subdebug('listener created with address=%r', self._address) | ||
@@ -392,8 +391,7 @@ self.close = Finalize( | ||
| # staticmethod | ||
| @staticmethod | ||
| def _finalize_pipelistener(queue, address): | ||
| log(5, 'closing listener with address=%r', address) | ||
| subdebug('closing listener with address=%r', address) | ||
| for handle in queue: | ||
| _processing.CloseHandle(handle) | ||
| _finalize_pipelistener = staticmethod(_finalize_pipelistener) | ||
@@ -400,0 +398,0 @@ def PipeClient(address): |
+16
-21
@@ -98,3 +98,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <dl class="docutils"> | ||
| <dt><strong>Queues</strong></dt> | ||
| <dt><strong>Queues</strong>:</dt> | ||
| <dd><p class="first">The function <tt class="docutils literal"><span class="pre">Queue()</span></tt> returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> | ||
@@ -122,3 +122,3 @@ -- see the Python standard documentation. For example</p> | ||
| </dd> | ||
| <dt><strong>Pipes</strong></dt> | ||
| <dt><strong>Pipes</strong>:</dt> | ||
| <dd><p class="first">The <tt class="docutils literal"><span class="pre">Pipe()</span></tt> function returns a pair of connection objects | ||
@@ -292,9 +292,7 @@ connected by a duplex (two-way) pipe, for example</p> | ||
| not significantly slower than passing the same objects between threads | ||
| using <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt>. (In fact on Linux it tends to be faster.) One | ||
| can also use <tt class="docutils literal"><span class="pre">processing.Pipe</span></tt> which at least on Windows is quite a | ||
| bit faster than <tt class="docutils literal"><span class="pre">processing.Queue</span></tt>.</p> | ||
| using <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt>. One can also use <tt class="docutils literal"><span class="pre">processing.Pipe</span></tt>.</p> | ||
| <p>The following benchmarks were performed on a single core Pentium 4, | ||
| 2.5Ghz laptop running Windows XP and Ubuntu Linux 6.10 --- see | ||
| <a class="reference" href="../test/test_speed.py">test_speed.py</a>.</p> | ||
| <p><em>Number of 256 byte strings passed between processes/threads per sec</em>:</p> | ||
| <p><em>Number of 256 byte string objects passed between processes/threads per sec</em>:</p> | ||
| <table border="1" class="docutils"> | ||
@@ -315,12 +313,16 @@ <colgroup> | ||
| <td>49,000</td> | ||
| <td>17,000-50,000 <a class="footnote-reference" href="#id4" id="id1" name="id1">[1]</a></td> | ||
| <td>17,000-50,000 <a class="footnote-reference" href="#id2" id="id1" name="id1">[1]</a></td> | ||
| </tr> | ||
| <tr><td>processing.Queue</td> | ||
| <td>22,000</td> | ||
| <td>21,000</td> | ||
| </tr> | ||
| <tr><td>processing.SimpleQueue</td> | ||
| <td>33,000</td> | ||
| <td>43,000</td> | ||
| </tr> | ||
| <tr><td>processing.PosixQueue</td> | ||
| <td>n/a</td> | ||
| <td>62,000 <a class="footnote-reference" href="#id5" id="id2" name="id2">[2]</a></td> | ||
| <td>62,000</td> | ||
| </tr> | ||
| <tr><td>processing.PipeQueue</td> | ||
| <td>27,000</td> | ||
| <td>37,000 <a class="footnote-reference" href="#id5" id="id3" name="id3">[2]</a></td> | ||
| </tr> | ||
| <tr><td>Queue managed by server</td> | ||
@@ -336,16 +338,9 @@ <td>6,900</td> | ||
| </table> | ||
| <table class="docutils footnote" frame="void" id="id4" rules="none"> | ||
| <table class="docutils footnote" frame="void" id="id2" rules="none"> | ||
| <colgroup><col class="label" /><col /></colgroup> | ||
| <tbody valign="top"> | ||
| <tr><td class="label"><a class="fn-backref" href="#id1" name="id4">[1]</a></td><td>For some reason the performance of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> is very | ||
| <tr><td class="label"><a class="fn-backref" href="#id1" name="id2">[1]</a></td><td>For some reason the performance of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> is very | ||
| variable on Linux.</td></tr> | ||
| </tbody> | ||
| </table> | ||
| <table class="docutils footnote" frame="void" id="id5" rules="none"> | ||
| <colgroup><col class="label" /><col /></colgroup> | ||
| <tbody valign="top"> | ||
| <tr><td class="label"><a name="id5">[2]</a></td><td><em>(<a class="fn-backref" href="#id2">1</a>, <a class="fn-backref" href="#id3">2</a>)</em> <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> is an alias for <tt class="docutils literal"><span class="pre">processing.PosixQueue</span></tt> if | ||
| available or <tt class="docutils literal"><span class="pre">processing.PipeQueue</span></tt>.</td></tr> | ||
| </tbody> | ||
| </table> | ||
| <p><em>Number of acquires/releases of a lock per sec</em>:</p> | ||
@@ -352,0 +347,0 @@ <table border="1" class="docutils"> |
+5
-8
@@ -306,5 +306,3 @@ .. include:: header.txt | ||
| not significantly slower than passing the same objects between threads | ||
| using `Queue.Queue`. (In fact on Linux it tends to be faster.) One | ||
| can also use `processing.Pipe` which at least on Windows is quite a | ||
| bit faster than `processing.Queue`. | ||
| using `Queue.Queue`. One can also use `processing.Pipe`. | ||
@@ -316,3 +314,3 @@ The following benchmarks were performed on a single core Pentium 4, | ||
| *Number of 256 byte strings passed between processes/threads per sec*: | ||
| *Number of 256 byte string objects passed between processes/threads per sec*: | ||
@@ -323,4 +321,5 @@ ================================== ========== ================== | ||
| Queue.Queue 49,000 17,000-50,000 [1]_ | ||
| processing.PosixQueue n/a 62,000 [2]_ | ||
| processing.PipeQueue 27,000 37,000 [2]_ | ||
| processing.Queue 22,000 21,000 | ||
| processing.SimpleQueue 33,000 43,000 | ||
| processing.PosixQueue n/a 62,000 | ||
| Queue managed by server 6,900 6,500 | ||
@@ -332,4 +331,2 @@ processing.Pipe 52,000 57,000 | ||
| variable on Linux. | ||
| .. [2] `processing.Queue` is an alias for `processing.PosixQueue` if | ||
| available or `processing.PipeQueue`. | ||
@@ -336,0 +333,0 @@ |
+28
-17
@@ -44,3 +44,5 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| accepts a single argument. When the result becomes ready | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).</p> | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed). | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> should complete immediately since otherwise the | ||
| thread which handles the results will get blocked.</p> | ||
| </dd> | ||
@@ -52,3 +54,3 @@ <dt><tt class="docutils literal"><span class="pre">map(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=None)</span></tt></dt> | ||
| it submits to the process pool as separate tasks. The | ||
| (maximum) size of these chunks can be specified by setting | ||
| (approximate) size of these chunks can be specified by setting | ||
| <tt class="docutils literal"><span class="pre">chunksize</span></tt> to a positive integer.</p> | ||
@@ -61,17 +63,19 @@ </dd> | ||
| accepts a single argument. When the result becomes ready | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed).</p> | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> is applied to it (unless the call failed). | ||
| <tt class="docutils literal"><span class="pre">callback</span></tt> should complete immediately since otherwise the | ||
| thread which handles the results will get blocked.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">imap(func,</span> <span class="pre">iterable)</span></tt></dt> | ||
| <dt><tt class="docutils literal"><span class="pre">imap(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt> | ||
| <dd><p class="first">An equivalent of <tt class="docutils literal"><span class="pre">itertools.imap()</span></tt>.</p> | ||
| <p>Note that this treats each element of the iterable as a | ||
| separate task, so for a long iterable where the function is | ||
| cheap to evaluate this likely to be <strong>much</strong> slower than using | ||
| <tt class="docutils literal"><span class="pre">map()</span></tt>.</p> | ||
| <p class="last">Also notice that the <tt class="docutils literal"><span class="pre">next(timeout=None)</span></tt> method of the | ||
| iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a timeout | ||
| parameter. <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise | ||
| <p>The <tt class="docutils literal"><span class="pre">chunksize</span></tt> argument is the same as the one used by the | ||
| <tt class="docutils literal"><span class="pre">map()</span></tt> method. For very long iterables using a large value | ||
| for <tt class="docutils literal"><span class="pre">chunksize</span></tt> can make make the job complete <strong>much</strong> faster | ||
| than using the default value of <tt class="docutils literal"><span class="pre">1</span></tt>.</p> | ||
| <p class="last">Also if <tt class="docutils literal"><span class="pre">chunksize</span></tt> is <tt class="docutils literal"><span class="pre">1</span></tt> then the <tt class="docutils literal"><span class="pre">next()</span></tt> method of the | ||
| iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a <tt class="docutils literal"><span class="pre">timeout</span></tt> | ||
| parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise | ||
| <tt class="docutils literal"><span class="pre">processing.TimeoutError</span></tt> if the result cannot be returned | ||
| with <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p> | ||
| within <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">imap_unordered(func,</span> <span class="pre">iterable)</span></tt></dt> | ||
| <dt><tt class="docutils literal"><span class="pre">imap_unordered(func,</span> <span class="pre">iterable,</span> <span class="pre">chunksize=1)</span></tt></dt> | ||
| <dd>The same as <tt class="docutils literal"><span class="pre">imap()</span></tt> except that the ordering of the results | ||
@@ -81,6 +85,13 @@ from the returned iterator should be considered arbitrary. | ||
| guaranteed to be "correct".)</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">shutdown()</span></tt></dt> | ||
| <dd>Tells the worker processes to shutdown. Gets called | ||
| automatically when the pool object is garbage collected or | ||
| when the process shuts down.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd>Prevents any more tasks from being submitted to the pool. | ||
| Once all the tasks have been completed the worker processes | ||
| will exit.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">terminate()</span></tt></dt> | ||
| <dd>Stops the worker processes immediately without completing | ||
| outstanding work. When the pool object is garbage collected | ||
| <tt class="docutils literal"><span class="pre">terminate()</span></tt> will be called immediately.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">join()</span></tt></dt> | ||
| <dd>Wait for the worker processes to exit. One must call | ||
| <tt class="docutils literal"><span class="pre">close()</span></tt> or <tt class="docutils literal"><span class="pre">terminate()</span></tt> before using <tt class="docutils literal"><span class="pre">join()</span></tt>.</dd> | ||
| </dl> | ||
@@ -87,0 +98,0 @@ </blockquote> |
+28
-15
@@ -37,2 +37,4 @@ .. include:: header.txt | ||
| `callback` is applied to it (unless the call failed). | ||
| `callback` should complete immediately since otherwise the | ||
| thread which handles the results will get blocked. | ||
@@ -45,3 +47,3 @@ `map(func, iterable, chunksize=None)` | ||
| it submits to the process pool as separate tasks. The | ||
| (maximum) size of these chunks can be specified by setting | ||
| (approximate) size of these chunks can be specified by setting | ||
| `chunksize` to a positive integer. | ||
@@ -56,18 +58,20 @@ | ||
| `callback` is applied to it (unless the call failed). | ||
| `callback` should complete immediately since otherwise the | ||
| thread which handles the results will get blocked. | ||
| `imap(func, iterable)` | ||
| `imap(func, iterable, chunksize=1)` | ||
| An equivalent of `itertools.imap()`. | ||
| Note that this treats each element of the iterable as a | ||
| separate task, so for a long iterable where the function is | ||
| cheap to evaluate this likely to be **much** slower than using | ||
| `map()`. | ||
| The `chunksize` argument is the same as the one used by the | ||
| `map()` method. For very long iterables using a large value | ||
| for `chunksize` can make make the job complete **much** faster | ||
| than using the default value of `1`. | ||
| Also notice that the `next(timeout=None)` method of the | ||
| iterator returned by the `imap()` method has a timeout | ||
| parameter. `next(timeout)` will raise | ||
| Also if `chunksize` is `1` then the `next()` method of the | ||
| iterator returned by the `imap()` method has an optional | ||
| `timeout` parameter: `next(timeout)` will raise | ||
| `processing.TimeoutError` if the result cannot be returned | ||
| with `timeout` seconds. | ||
| within `timeout` seconds. | ||
| `imap_unordered(func, iterable)` | ||
| `imap_unordered(func, iterable, chunksize=1)` | ||
| The same as `imap()` except that the ordering of the results | ||
@@ -78,8 +82,17 @@ from the returned iterator should be considered arbitrary. | ||
| `shutdown()` | ||
| Tells the worker processes to shutdown. Gets called | ||
| automatically when the pool object is garbage collected or | ||
| when the process shuts down. | ||
| `close()` | ||
| Prevents any more tasks from being submitted to the pool. | ||
| Once all the tasks have been completed the worker processes | ||
| will exit. | ||
| `terminate()` | ||
| Stops the worker processes immediately without completing | ||
| outstanding work. When the pool object is garbage collected | ||
| `terminate()` will be called immediately. | ||
| `join()` | ||
| Wait for the worker processes to exit. One must call | ||
| `close()` or `terminate()` before using `join()`. | ||
| Asynchronous result objects | ||
@@ -86,0 +99,0 @@ =========================== |
+70
-62
@@ -31,3 +31,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <dt><strong>exception</strong> <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt></dt> | ||
| <dd><p class="first">Exception raise by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a | ||
| <dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a | ||
| <a class="reference" href="connection-ref.html#connection-objects">connection object</a> | ||
@@ -76,71 +76,79 @@ when the supplied buffer object is too small for the message | ||
| same time.</p> | ||
| <p>Also note that if a process is killed while it is trying to | ||
| read or write to a pipe then the data in the pipe is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie).</p> | ||
| <p class="last">On Windows this requires the <tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt> | ||
| <dd><p class="first">Alias for either <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> if available or else <tt class="docutils literal"><span class="pre">PipeQueue</span></tt> | ||
| -- see below.</p> | ||
| <p class="last">It differs from Python standard <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> type by only | ||
| have finite capacity, so even when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is specified as | ||
| <tt class="docutils literal"><span class="pre">0</span></tt> the <tt class="docutils literal"><span class="pre">put()</span></tt> method might block. If you need to be sure | ||
| that <tt class="docutils literal"><span class="pre">put()</span></tt> will not block then you should use | ||
| <tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt> instead.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt></dt> | ||
| <dd><p class="first">Alias for either <tt class="docutils literal"><span class="pre">BufferedPosixQueue</span></tt> if available or else | ||
| <tt class="docutils literal"><span class="pre">BufferedPipeQueue</span></tt>.</p> | ||
| <p>Differs from <tt class="docutils literal"><span class="pre">Queue()</span></tt> by guaranteeing that the <tt class="docutils literal"><span class="pre">put()</span></tt> method | ||
| will not block. A buffered queue tends to be slower than a | ||
| normal queue if you are putting items on the queue one by one | ||
| (rather than using <tt class="docutils literal"><span class="pre">putmany()</span></tt>).</p> | ||
| <p>The first time a process puts an item on a buffered queue a | ||
| thread is started whose job is to transfer items from a buffer | ||
| onto the true interprocess queue. In addition to the usual | ||
| queue methods <tt class="docutils literal"><span class="pre">BufferedQueue</span></tt> supports two extra:</p> | ||
| <blockquote class="last"> | ||
| <dd><p class="first">Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe.</p> | ||
| <p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt> | ||
| method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and | ||
| <tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p> | ||
| <p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods:</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt> | ||
| <dd>Adds all items in the iterable to the queue's buffer. | ||
| So <tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster equivalent of | ||
| <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span> <span class="pre">q.put(x)</span></tt>.</dd> | ||
| <dd>If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| <tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span> | ||
| <span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite | ||
| size.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd>Flushes data from the buffer to the interprocess | ||
| queue, then instructs the thread to stop and waits for | ||
| it to do so. This will be called automatically when | ||
| the queue is garbage collected or when the process | ||
| exits.</dd> | ||
| <dd>Indicates that no more data will be put on this queue | ||
| by the current process. The background thread will | ||
| quit once it has flushed all it data. This is called | ||
| automatically when the queue is garbage collected.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt> | ||
| <dd><p class="first">This joins the background thread and can only be used | ||
| after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe.</p> | ||
| <p class="last">By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| <tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt> | ||
| <dd>Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue.</dd> | ||
| </dl> | ||
| </blockquote> | ||
| <p class="last">Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie).</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">PipeQueue(maxsize=0)</span></tt></dt> | ||
| <dd><p class="first">Returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the | ||
| <tt class="docutils literal"><span class="pre">qsize()</span></tt> method is not implemented. It is implemented using | ||
| a pipe and some locks/semaphores.</p> | ||
| <p>On Unix if a client terminates while it is reading or writing | ||
| from the queue, other clients reading from the queue may lose | ||
| track of where messages boundaries are or may retrieve | ||
| incomplete messages. At least on Windows a keyboard interrupt | ||
| (SIGINT) or the use of a process's <tt class="docutils literal"><span class="pre">stop()</span></tt> method should not | ||
| cause such a problem.</p> | ||
| <p>Placing an object on a <tt class="docutils literal"><span class="pre">PipeQueue</span></tt> can block because of lack | ||
| of buffer space even if a zero timeout is used.</p> | ||
| <p class="last">Requires support for native semaphore support from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p> | ||
| <dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt> | ||
| <dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is | ||
| really just a pipe protected by a couple of locks.</p> | ||
| <p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have | ||
| <tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt> | ||
| method.</p> | ||
| <p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero) | ||
| using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt> | ||
| <dd><p class="first">Returns a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> implemented using a (Unix | ||
| only) posix message queue.</p> | ||
| <p>If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is non-zero it determines the maximum number of | ||
| messages that can be in the queue and if <tt class="docutils literal"><span class="pre">msgsize</span></tt> is non-zero | ||
| it determines the maximum size in bytes of a message. If | ||
| either is zero then the system default (which is finite) is | ||
| used. (For instance on my Linux system the default maximum | ||
| number of messages in a queue is 10 and the maximum message | ||
| size is 8192 bytes.) A <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> object has attributes | ||
| <tt class="docutils literal"><span class="pre">_maxmsg</span></tt> and <tt class="docutils literal"><span class="pre">_maxsize</span></tt> which give these limits for that | ||
| queue.</p> | ||
| <p>Note that if <tt class="docutils literal"><span class="pre">maxsize</span></tt> or <tt class="docutils literal"><span class="pre">msgsize</span></tt> is larger than the system | ||
| maximum then an <tt class="docutils literal"><span class="pre">OSError</span></tt> exception will be thrown. On Linux | ||
| the system maximums can viewed and modified through the | ||
| <tt class="docutils literal"><span class="pre">/proc</span></tt> filesystem --- see <tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p> | ||
| <p class="last">Only available on Unix and only if support for posix queues | ||
| was built in to <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p> | ||
| <dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix | ||
| systems which support Posix message queues.</p> | ||
| <p>However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| <tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p> | ||
| <p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value | ||
| is used.</p> | ||
| <p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p> | ||
| <p class="last">If one tries to send a message which is too long then | ||
| <tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p> | ||
| </dd> | ||
@@ -275,5 +283,5 @@ </dl> | ||
| <dt><tt class="docutils literal"><span class="pre">enableLogging(level,</span> <span class="pre">HandlerType=None,</span> <span class="pre">handlerArgs=(),</span> <span class="pre">format=None)</span></tt></dt> | ||
| <dd><p class="first">Enables logging and sets the debug level to <tt class="docutils literal"><span class="pre">level</span></tt> -- see | ||
| documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt> package in the standard | ||
| library.</p> | ||
| <dd><p class="first">Enables logging and sets the debug level used by the package's | ||
| logger to <tt class="docutils literal"><span class="pre">level</span></tt> -- see documentation for the <tt class="docutils literal"><span class="pre">logging</span></tt> | ||
| package in the standard library.</p> | ||
| <p>If <tt class="docutils literal"><span class="pre">HandlerType</span></tt> is specified then a handler is created using | ||
@@ -280,0 +288,0 @@ <tt class="docutils literal"><span class="pre">HandlerType(*handlerArgs)</span></tt> and this will be used by the |
+74
-63
@@ -24,3 +24,3 @@ .. include:: header.txt | ||
| **exception** `BufferTooShort` | ||
| Exception raise by the `recvbytes_into()` method of a | ||
| Exception raised by the `recvbytes_into()` method of a | ||
| `connection object <connection-ref.html#connection-objects>`_ | ||
@@ -73,78 +73,89 @@ when the supplied buffer object is too small for the message | ||
| Also note that if a process is killed while it is trying to | ||
| read or write to a pipe then the data in the pipe is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie). | ||
| On Windows this requires the `_processing` extension. | ||
| `Queue(maxsize=0)` | ||
| Alias for either `PosixQueue` if available or else `PipeQueue` | ||
| -- see below. | ||
| Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe. | ||
| It differs from Python standard `Queue.Queue` type by only | ||
| have finite capacity, so even when `maxsize` is specified as | ||
| `0` the `put()` method might block. If you need to be sure | ||
| that `put()` will not block then you should use | ||
| `BufferedQueue()` instead. | ||
| It is a near clone of `Queue.Queue` except that the `qsize()` | ||
| method is not implemented and that the `task_done()` and | ||
| `join()` methods introduced in Python 2.5 are also missing. | ||
| `BufferedQueue()` | ||
| Alias for either `BufferedPosixQueue` if available or else | ||
| `BufferedPipeQueue`. | ||
| `Queue` has a few additional methods: | ||
| Differs from `Queue()` by guaranteeing that the `put()` method | ||
| will not block. A buffered queue tends to be slower than a | ||
| normal queue if you are putting items on the queue one by one | ||
| (rather than using `putmany()`). | ||
| `putmany(iterable)` | ||
| If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| `q.putmany(X)` is a faster alternative to `for x in X: | ||
| q.put(x)`. Raises an error if the queue has finite | ||
| size. | ||
| The first time a process puts an item on a buffered queue a | ||
| thread is started whose job is to transfer items from a buffer | ||
| onto the true interprocess queue. In addition to the usual | ||
| queue methods `BufferedQueue` supports two extra: | ||
| `close()` | ||
| Indicates that no more data will be put on this queue | ||
| by the current process. The background thread will | ||
| quit once it has flushed all it data. This is called | ||
| automatically when the queue is garbage collected. | ||
| `putmany(iterable)` | ||
| Adds all items in the iterable to the queue's buffer. | ||
| So `q.putmany(X)` is a faster equivalent of | ||
| `for x in X: q.put(x)`. | ||
| `jointhread()` | ||
| This joins the background thread and can only be used | ||
| after `close()` has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe. | ||
| `close()` | ||
| Flushes data from the buffer to the interprocess | ||
| queue, then instructs the thread to stop and waits for | ||
| it to do so. This will be called automatically when | ||
| the queue is garbage collected or when the process | ||
| exits. | ||
| By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| `canceljoin()` to prevent this behaviour. | ||
| `PipeQueue(maxsize=0)` | ||
| Returns a near clone of `Queue.Queue` except that the | ||
| `qsize()` method is not implemented. It is implemented using | ||
| a pipe and some locks/semaphores. | ||
| On Unix if a client terminates while it is reading or writing | ||
| from the queue, other clients reading from the queue may lose | ||
| track of where messages boundaries are or may retrieve | ||
| incomplete messages. At least on Windows a keyboard interrupt | ||
| (SIGINT) or the use of a process's `stop()` method should not | ||
| cause such a problem. | ||
| Placing an object on a `PipeQueue` can block because of lack | ||
| of buffer space even if a zero timeout is used. | ||
| Requires support for native semaphore support from `_processing`. | ||
| `canceljoin()` | ||
| Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue. | ||
| Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie). | ||
| `SimpleQueue()` | ||
| A simplified and faster alternative to `Queue()`. It is | ||
| really just a pipe protected by a couple of locks. | ||
| It has `get()` and `put()` methods but these do not have | ||
| `block` or `timeout` arguments. It also has an `empty()` | ||
| method. | ||
| **Warning**: Unlike with `Queue` (when `maxsize` is zero) | ||
| using `put()` may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible. | ||
| `PosixQueue(maxsize=0, msgsize=0)` | ||
| Returns a near clone of `Queue.Queue` implemented using a (Unix | ||
| only) posix message queue. | ||
| If `maxsize` is non-zero it determines the maximum number of | ||
| messages that can be in the queue and if `msgsize` is non-zero | ||
| it determines the maximum size in bytes of a message. If | ||
| either is zero then the system default (which is finite) is | ||
| used. (For instance on my Linux system the default maximum | ||
| number of messages in a queue is 10 and the maximum message | ||
| size is 8192 bytes.) A `PosixQueue` object has attributes | ||
| `_maxmsg` and `_maxsize` which give these limits for that | ||
| queue. | ||
| A faster alternative to `Queue()` which is available on Unix | ||
| systems which support Posix message queues. | ||
| Note that if `maxsize` or `msgsize` is larger than the system | ||
| maximum then an `OSError` exception will be thrown. On Linux | ||
| the system maximums can viewed and modified through the | ||
| `/proc` filesystem --- see `man 7 mq_overview`. | ||
| However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| `man 7 mq_overview`. | ||
| Only available on Unix and only if support for posix queues | ||
| was built in to `_processing`. | ||
| `maxsize` if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If `maxsize` is zero then this maximum value | ||
| is used. | ||
| `msgsize` if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If `msgsize` is zero then this maximum value is used. | ||
| If one tries to send a message which is too long then | ||
| `ValueError` will be raised. | ||
@@ -151,0 +162,0 @@ |
@@ -69,16 +69,2 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| </dd> | ||
| <dt><em>Queues, deadlocks and buffering</em></dt> | ||
| <dd><p class="first">Unlike Python's standard <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> type (when a maximum size | ||
| is not specified), <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> does not guarantee that | ||
| <tt class="docutils literal"><span class="pre">put()</span></tt> will succeed without blocking. If you do need this | ||
| guarantee then you can use <tt class="docutils literal"><span class="pre">processing.BufferedQueue</span></tt> instead.</p> | ||
| <p class="last">As an example, a standard pattern is for a main process to push | ||
| some tasks onto a task queue so that they will be processed by a | ||
| pool of worker processes and then for the main process to retreive | ||
| the results from a result queue. If both queues are unbuffered | ||
| then one will get a deadlock if one tries to push too many tasks | ||
| on the task queue without removing any from the result queue. | ||
| Instead one should probably make the task queue buffered -- see | ||
| <a class="reference" href="../test/test_workers.py">test_workers.py</a>.</p> | ||
| </dd> | ||
| </dl> | ||
@@ -88,4 +74,3 @@ </div> | ||
| <h1><a id="windows" name="windows">Windows</a></h1> | ||
| <p>Platforms such as Windows which lack <tt class="docutils literal"><span class="pre">os.fork()</span></tt> have a few extra | ||
| restrictions:</p> | ||
| <p>Since Windows lacks <tt class="docutils literal"><span class="pre">os.fork()</span></tt> it has a few extra restrictions:</p> | ||
| <p><em>More picklability</em>:</p> | ||
@@ -92,0 +77,0 @@ <blockquote> |
@@ -69,23 +69,7 @@ .. include:: header.txt | ||
| *Queues, deadlocks and buffering* | ||
| Unlike Python's standard `Queue.Queue` type (when a maximum size | ||
| is not specified), `processing.Queue` does not guarantee that | ||
| `put()` will succeed without blocking. If you do need this | ||
| guarantee then you can use `processing.BufferedQueue` instead. | ||
| As an example, a standard pattern is for a main process to push | ||
| some tasks onto a task queue so that they will be processed by a | ||
| pool of worker processes and then for the main process to retreive | ||
| the results from a result queue. If both queues are unbuffered | ||
| then one will get a deadlock if one tries to push too many tasks | ||
| on the task queue without removing any from the result queue. | ||
| Instead one should probably make the task queue buffered -- see | ||
| `test_workers.py <../test/test_workers.py>`_. | ||
| Windows | ||
| ------- | ||
| Platforms such as Windows which lack `os.fork()` have a few extra | ||
| restrictions: | ||
| Since Windows lacks `os.fork()` it has a few extra restrictions: | ||
@@ -92,0 +76,0 @@ *More picklability*: |
+4
-0
@@ -44,2 +44,6 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| </ul> | ||
| <p>The project is hosted at</p> | ||
| <ul class="simple"> | ||
| <li><a class="reference" href="http://developer.berlios.de/projects/pyprocessing">http://developer.berlios.de/projects/pyprocessing</a></li> | ||
| </ul> | ||
| <p>The package can be downloaded from</p> | ||
@@ -46,0 +50,0 @@ <ul class="simple"> |
+80
-87
@@ -8,3 +8,3 @@ import mmap | ||
| from processing import synchronize, process | ||
| from processing import synchronize, process, queue, _processing | ||
| from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize | ||
@@ -14,3 +14,2 @@ | ||
| # | ||
@@ -24,36 +23,39 @@ # Heap implementation | ||
| ''' | ||
| def __init__(self, size=256, _name=None, _lock=None): | ||
| assert size >= 256 | ||
| def __init__(self, size, lock=None): | ||
| fd, self._name = tempfile.mkstemp(prefix='pym-') | ||
| self._size = remaining = size | ||
| while remaining > 0: | ||
| remaining -= os.write(fd, '\0' * remaining) | ||
| self._lock = lock or RLock() | ||
| self._mmap = mmap.mmap(fd, self._size) | ||
| os.close(fd) | ||
| self._set_pos(8) | ||
| if _name is None: | ||
| fd, self._name = tempfile.mkstemp(prefix='pym-') | ||
| remaining = size | ||
| while remaining > 0: | ||
| remaining -= os.write(fd, '\0' * remaining) | ||
| self._lock = _lock or RLock() | ||
| if sys.platform in ('win32', 'cygwin'): | ||
| process.Finalize(self, Heap._finalize_heap, | ||
| args=[self._mmap, self._name], atexit=True) | ||
| else: | ||
| assert _lock is not None | ||
| self._lock = _lock | ||
| self._name = _name | ||
| fd = os.open(_name, os.O_RDWR, 0600) | ||
| os.unlink(self._name) | ||
| self._size = size | ||
| def __getstate__(self): | ||
| if sys.platform != 'win32': | ||
| raise NotImplemented | ||
| return self._size, self._name, self._lock | ||
| def __setstate__(self, state): | ||
| self._size, self._name, self._lock = state | ||
| fd = os.open(self._name, os.O_RDWR, 0600) | ||
| self._mmap = mmap.mmap(fd, self._size) | ||
| os.close(fd) | ||
| if _name is None: | ||
| self._set_pos(8) | ||
| if sys.platform in ('win32', 'cygwin'): | ||
| def close_mmap(mmap, name): | ||
| import os | ||
| mmap.close() | ||
| os.unlink(name) | ||
| process.Finalize( | ||
| self, close_mmap, args=[self._mmap, self._name], | ||
| atexit=True | ||
| ) | ||
| else: | ||
| os.unlink(self._name) | ||
| @staticmethod | ||
| def _finalize_heap(mmap, name): | ||
| import os | ||
| mmap.close() | ||
| try: | ||
| os.unlink(name) | ||
| except WindowsError: | ||
| process.info('failed to unlink %r -- try again later', name) | ||
| process._trylater.append((os.unlink, [name])) | ||
| def malloc(self, bytes): | ||
@@ -88,53 +90,49 @@ ''' | ||
| ''' | ||
| self._lock.acquire() | ||
| try: | ||
| # resize mmap if the file has been enlarged by other process | ||
| if self._size < self._mmap.size(): | ||
| self._mmap.resize(self._mmap.size()) | ||
| self._size = len(self._mmap) | ||
| start = stop = None | ||
| # iterate over empty chunks | ||
| for start, stop in self._get_empty_chunks(): | ||
| # if chunk is large enough | ||
| if stop - start >= bytes: | ||
| # write header for the chunk we are creating | ||
| self._set_chunk_info(start, start + bytes, True) | ||
| # write header for following empty chunk if necessary | ||
| if start + bytes < stop: | ||
| self._set_chunk_info(start + bytes, stop, False) | ||
| # update position of last empty chunk | ||
| if start == self._get_pos(): | ||
| try: | ||
| pos, _ = self._get_empty_chunks().next() | ||
| except StopIteration: | ||
| pos = self._size | ||
| self._set_pos(pos) | ||
| return start | ||
| # get position of memory just after last occupied chunk | ||
| if stop == self._size: | ||
| pos = start | ||
| else: | ||
| pos = self._size | ||
| # work out how much space we need and resize mmap | ||
| while pos + bytes > self._size: | ||
| self._size *= 2 | ||
| if self._size > len(self._mmap): | ||
| self._mmap.resize(self._size) | ||
| # mark chunk as occupied and update position of last empty chunk | ||
| self._set_chunk_info(pos, pos + bytes, True) | ||
| if pos == self._get_pos(): | ||
| self._set_pos(pos + bytes) | ||
| return pos | ||
| finally: | ||
| self._lock.release() | ||
| # resize mmap if the file has been enlarged by other process | ||
| if self._size < self._mmap.size(): | ||
| self._mmap.resize(self._mmap.size()) | ||
| self._size = len(self._mmap) | ||
| start = stop = None | ||
| # iterate over empty chunks | ||
| for start, stop in self._get_empty_chunks(): | ||
| # if chunk is large enough | ||
| if stop - start >= bytes: | ||
| # write header for the chunk we are creating | ||
| self._set_chunk_info(start, start + bytes, True) | ||
| # write header for following empty chunk if necessary | ||
| if start + bytes < stop: | ||
| self._set_chunk_info(start + bytes, stop, False) | ||
| # update position of last empty chunk | ||
| if start == self._get_pos(): | ||
| try: | ||
| pos, _ = self._get_empty_chunks().next() | ||
| except StopIteration: | ||
| pos = self._size | ||
| self._set_pos(pos) | ||
| return start | ||
| # get position of memory just after last occupied chunk | ||
| if stop == self._size: | ||
| pos = start | ||
| else: | ||
| pos = self._size | ||
| # work out how much space we need and resize mmap | ||
| while pos + bytes > self._size: | ||
| self._size *= 2 | ||
| if self._size > len(self._mmap): | ||
| self._mmap.resize(self._size) | ||
| # mark chunk as occupied and update position of last empty chunk | ||
| self._set_chunk_info(pos, pos + bytes, True) | ||
| if pos == self._get_pos(): | ||
| self._set_pos(pos + bytes) | ||
| return pos | ||
| def _free(self, start): | ||
@@ -236,7 +234,2 @@ ''' | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return type(self), (self._size, self._name, self._lock) | ||
| # | ||
@@ -459,3 +452,3 @@ # Class for a struct which lives in shared memory | ||
| Event = synchronize.Event | ||
| Queue = synchronize.Queue | ||
| Queue = queue.Queue | ||
| Lock = synchronize.Lock | ||
@@ -471,3 +464,3 @@ RLock = synchronize.RLock | ||
| if not hasattr(self, '_heap'): | ||
| self._heap = Heap(size=self._size, _lock=self._lock) | ||
| self._heap = Heap(size=self._size, lock=self._lock) | ||
| return self._heap | ||
@@ -474,0 +467,0 @@ finally: |
+30
-39
@@ -23,3 +23,3 @@ # | ||
| from processing.process import Process, currentProcess, Finalize | ||
| from processing.process import debug, info, log | ||
| from processing.process import debug, info, subdebug, activeChildren | ||
@@ -384,3 +384,6 @@ try: | ||
| info('manager received shutdown message') | ||
| process._exit_func() | ||
| Finalize._run_all_finalizers() | ||
| for p in activeChildren(): | ||
| debug('terminating a child process of manager') | ||
| p.terminate() | ||
| info('process exiting with `os.exit(0)`') | ||
@@ -430,25 +433,2 @@ os._exit(0) | ||
| def _run_server(registry, address, authkey, parent_address): | ||
| ''' | ||
| Instantiate a server and run it. | ||
| This function is invoked in a subprocess. | ||
| Really it should be a staticmethod of `BaseManager`, but that | ||
| would cause problems with pickling. | ||
| ''' | ||
| # create server | ||
| server = Server(registry, address, authkey) | ||
| currentProcess()._server = server | ||
| # inform parent process of the server's address | ||
| connection = Client(parent_address) | ||
| connection.send(server.address) | ||
| connection.close() | ||
| # run the manager | ||
| info('manager bound to %r' % server.address) | ||
| server.serve_forever() | ||
| class BaseManager(object): | ||
@@ -516,2 +496,20 @@ ''' | ||
| @classmethod | ||
| def _run_server(cls, registry, address, authkey, parent_address): | ||
| ''' | ||
| Instantiate a server and run it | ||
| ''' | ||
| # create server | ||
| server = Server(registry, address, authkey) | ||
| currentProcess()._server = server | ||
| # inform parent process of the server's address | ||
| connection = Client(parent_address) | ||
| connection.send(server.address) | ||
| connection.close() | ||
| # run the manager | ||
| info('manager bound to %r', server.address) | ||
| server.serve_forever() | ||
| def serve_forever(self): | ||
@@ -568,3 +566,3 @@ ''' | ||
| # staticmethod | ||
| @staticmethod | ||
| def _get_registry_creators(self_or_cls): | ||
@@ -582,4 +580,2 @@ registry = {} | ||
| return registry, creators | ||
| _get_registry_creators = staticmethod(_get_registry_creators) | ||
@@ -592,3 +588,3 @@ def __enter__(self): | ||
| # staticmethod | ||
| @staticmethod | ||
| def _finalize_manager(process, address, authkey): | ||
@@ -617,6 +613,2 @@ ''' | ||
| _finalize_manager = staticmethod(_finalize_manager) | ||
| _run_server = staticmethod(_run_server) | ||
| address = property(lambda self: self._address) | ||
@@ -752,3 +744,3 @@ | ||
| # staticmethod | ||
| @staticmethod | ||
| def _finalize_proxy(connection, key, token, private): | ||
@@ -777,3 +769,2 @@ private._mutex.acquire() | ||
| private._mutex.release() | ||
| _finalize_proxy = staticmethod(_finalize_proxy) | ||
@@ -792,3 +783,3 @@ def __reduce__(self): | ||
| return '<Proxy[%s] object at %s>' % (self._token.typeid, | ||
| '0x%x' % (id(self) & 0xffffffff)) | ||
| '0x%x' % id(self)) | ||
@@ -872,3 +863,3 @@ def __str__(self): | ||
| ''' | ||
| log(5, 'reset all proxies') | ||
| subdebug('reset all proxies') | ||
| _Private._mutex.acquire() | ||
@@ -883,4 +874,4 @@ try: | ||
| proxy._connect(authkey=authkey, name=process_name) | ||
| except KeyError: | ||
| pass | ||
| except KeyError, e: | ||
| debug('ignoring KeyError: %r', e) | ||
| finally: | ||
@@ -887,0 +878,0 @@ _Private._mutex.release() |
+4
-1
| Metadata-Version: 1.0 | ||
| Name: processing | ||
| Version: 0.37 | ||
| Version: 0.38 | ||
| Summary: Package for using processes mimicking the `threading` module | ||
@@ -29,3 +29,6 @@ Home-page: http://developer.berlios.de/projects/pyprocessing | ||
| The project is hosted at | ||
| * http://developer.berlios.de/projects/pyprocessing | ||
| The package can be downloaded from | ||
@@ -32,0 +35,0 @@ |
+171
-80
@@ -21,3 +21,13 @@ # | ||
| from processing.process import debug | ||
| # | ||
| # Constants representing the state of a pool | ||
| # | ||
| RUN = 0 | ||
| CLOSE = 1 | ||
| TERMINATE = 2 | ||
| # | ||
| # Miscellaneous | ||
@@ -39,2 +49,4 @@ # | ||
| def worker(inqueue, outqueue): | ||
| put = outqueue.put | ||
| for job, i, func, args, kwds in iter(inqueue.get, None): | ||
@@ -45,8 +57,10 @@ try: | ||
| result = (False, e) | ||
| outqueue.put((job, i, result)) | ||
| put((job, i, result)) | ||
| debug('worker got None -- exiting') | ||
| # | ||
| # Class representing a process pool | ||
| # | ||
| class Pool(object): | ||
@@ -57,6 +71,7 @@ ''' | ||
| def __init__(self, processes=None): | ||
| self._inqueue = processing.PipeQueue() | ||
| self._outqueue = processing.PipeQueue() | ||
| self._inqueue = processing.SimpleQueue() | ||
| self._outqueue = processing.SimpleQueue() | ||
| self._taskqueue = Queue.Queue() | ||
| self._cache = {} | ||
| self._state = RUN | ||
@@ -79,24 +94,26 @@ if processes is None: | ||
| # Note: these threads will be joined by the finalizer and are | ||
| # only made daemonic because otherwise the exit handler | ||
| # registered by `threading` might try to join them | ||
| # before the finalizer is run | ||
| thandler = threading.Thread(target=Pool._handle_tasks, | ||
| args=[self._taskqueue, self._inqueue]) | ||
| thandler.setDaemon(True) | ||
| thandler.start() | ||
| self._task_handler = threading.Thread( | ||
| target=Pool._handle_tasks, | ||
| args=[self._taskqueue, self._inqueue, self._outqueue, | ||
| self._pool] | ||
| ) | ||
| self._task_handler.setDaemon(True) | ||
| self._task_handler._state = RUN | ||
| self._task_handler.start() | ||
| rhandler = threading.Thread(target=Pool._handle_results, | ||
| args=[self._outqueue, self._cache]) | ||
| rhandler.setDaemon(True) | ||
| rhandler.start() | ||
| self._result_handler = threading.Thread( | ||
| target=Pool._handle_results, | ||
| args=[self._outqueue, self._cache] | ||
| ) | ||
| self._result_handler.setDaemon(True) | ||
| self._result_handler._state = RUN | ||
| self._result_handler.start() | ||
| self.shutdown = processing.Finalize( | ||
| self, Pool._finalize_pool, | ||
| args=(self._taskqueue, self._inqueue, self._outqueue, | ||
| thandler, rhandler, self._pool), | ||
| self._terminate = processing.Finalize( | ||
| self, Pool._terminate_pool, | ||
| args=[self._taskqueue, self._outqueue, self._cache, self._pool, | ||
| self._task_handler, self._result_handler], | ||
| atexit=True | ||
| ) | ||
| def apply(self, func, args=(), kwds={}): | ||
@@ -106,30 +123,48 @@ ''' | ||
| ''' | ||
| assert self._state == RUN | ||
| return self.apply_async(func, args, kwds).get() | ||
| def map(self, func, seq, chunksize=None): | ||
| def map(self, func, iterable, chunksize=None): | ||
| ''' | ||
| Equivalent of `map()` builtin | ||
| ''' | ||
| return self.map_async(func, seq, chunksize).get() | ||
| assert self._state == RUN | ||
| return self.map_async(func, iterable, chunksize).get() | ||
| def imap(self, func, seq): | ||
| def imap(self, func, iterable, chunksize=1): | ||
| ''' | ||
| Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()` | ||
| ''' | ||
| result = IMapIterator(self._cache) | ||
| job = result._job | ||
| self._taskqueue.put((((job, i, func, (x,), {}) | ||
| for i, x in enumerate(seq)), result._setlength)) | ||
| return result | ||
| assert self._state == RUN | ||
| if chunksize == 1: | ||
| result = IMapIterator(self._cache) | ||
| self._taskqueue.put((((result._job, i, func, (x,), {}) | ||
| for i, x in enumerate(iterable)), result._setlength)) | ||
| return result | ||
| else: | ||
| assert chunksize > 1 | ||
| task_batches = Pool._gettasks(func, iterable, chunksize) | ||
| result = IMapIterator(self._cache) | ||
| self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
| for i, x in enumerate(task_batches)), result._setlength)) | ||
| return (item for chunk in result for item in chunk) | ||
| def imap_unordered(self, func, seq): | ||
| def imap_unordered(self, func, iterable, chunksize=1): | ||
| ''' | ||
| Like `imap()` method but ordering of results is arbitrary | ||
| ''' | ||
| result = IMapUnorderedIterator(self._cache) | ||
| job = result._job | ||
| self._taskqueue.put((((job, i, func, (x,), {}) | ||
| for i, x in enumerate(seq)), result._setlength)) | ||
| return result | ||
| assert self._state == RUN | ||
| if chunksize == 1: | ||
| result = IMapUnorderedIterator(self._cache) | ||
| self._taskqueue.put((((result._job, i, func, (x,), {}) | ||
| for i, x in enumerate(iterable)), result._setlength)) | ||
| return result | ||
| else: | ||
| assert chunksize > 1 | ||
| task_batches = Pool._gettasks(func, iterable, chunksize) | ||
| result = IMapUnorderedIterator(self._cache) | ||
| self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
| for i, x in enumerate(task_batches)), result._setlength)) | ||
| return (item for chunk in result for item in chunk) | ||
| def apply_async(self, func, args=(), kwds={}, callback=None): | ||
@@ -139,30 +174,35 @@ ''' | ||
| ''' | ||
| assert self._state == RUN | ||
| result = ApplyResult(self._cache, callback) | ||
| job = result._job | ||
| self._taskqueue.put(([(job, None, func, args, kwds)], None)) | ||
| self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) | ||
| return result | ||
| def map_async(self, func, seq, chunksize=None, callback=None): | ||
| def map_async(self, func, iterable, chunksize=None, callback=None): | ||
| ''' | ||
| Asynchronous equivalent of `map()` builtin | ||
| ''' | ||
| if not hasattr(seq, '__len__'): | ||
| seq = list(seq) | ||
| assert self._state == RUN | ||
| if not hasattr(iterable, '__len__'): | ||
| iterable = list(iterable) | ||
| if chunksize is None: | ||
| chunksize, extra = divmod(len(seq), len(self._pool) * 4) | ||
| chunksize, extra = divmod(len(iterable), len(self._pool) * 4) | ||
| if extra: | ||
| chunksize += 1 | ||
| task_batches = Pool._gettasks(func, seq, chunksize) | ||
| result = MapResult(self._cache, chunksize, len(seq), callback) | ||
| job = result._job | ||
| self._taskqueue.put((((job, i, mapstar, (x,), {}) | ||
| task_batches = Pool._gettasks(func, iterable, chunksize) | ||
| result = MapResult(self._cache, chunksize, len(iterable), callback) | ||
| self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
| for i, x in enumerate(task_batches)), None)) | ||
| return result | ||
| # staticmethod | ||
| def _handle_tasks(taskqueue, inqueue): | ||
| put = inqueue._put | ||
| @staticmethod | ||
| def _handle_tasks(taskqueue, inqueue, outqueue, pool): | ||
| thread = threading.currentThread() | ||
| put = inqueue._writer.send | ||
| for taskseq, setlength in iter(taskqueue.get, None): | ||
| if thread._state: | ||
| debug('task handler found thread._state != RUN -- exiting') | ||
| return | ||
| i = -1 | ||
@@ -173,7 +213,25 @@ for i, task in enumerate(taskseq): | ||
| setlength(i+1) | ||
| _handle_tasks = staticmethod(_handle_tasks) | ||
| else: | ||
| debug('task handler got None') | ||
| # staticmethod | ||
| # tell workers there is no more work | ||
| debug('task handler sending None to workers') | ||
| for p in pool: | ||
| put(None) | ||
| # tell result handler to finish when cache is empty | ||
| outqueue.put(None) | ||
| debug('task handler exiting') | ||
| @staticmethod | ||
| def _handle_results(outqueue, cache): | ||
| for job, i, obj in iter(outqueue._get, None): | ||
| thread = threading.currentThread() | ||
| get = outqueue._reader.recv | ||
| for job, i, obj in iter(get, None): | ||
| if thread._state: | ||
| assert thread._state == TERMINATE | ||
| debug('result handler found thread._state=TERMINATE') | ||
| return | ||
| try: | ||
@@ -183,31 +241,19 @@ cache[job]._set(i, obj) | ||
| pass | ||
| _handle_results = staticmethod(_handle_results) | ||
| # staticmethod | ||
| def _finalize_pool(taskqueue, inqueue, outqueue, | ||
| task_handler, result_handler, pool): | ||
| # stop task handler thread | ||
| taskqueue.not_empty.acquire() | ||
| try: | ||
| taskqueue.queue.clear() | ||
| taskqueue.queue.append(None) | ||
| taskqueue.not_empty.notify() | ||
| finally: | ||
| taskqueue.not_empty.release() | ||
| else: | ||
| debug('result handler got None') | ||
| # stop result handler thread | ||
| outqueue.put(None) | ||
| while cache and thread._state == RUN: | ||
| item = get() | ||
| if item is None: | ||
| debug('result handler ignoring None') | ||
| job, i, obj = item | ||
| try: | ||
| cache[job]._set(i, obj) | ||
| except KeyError: | ||
| pass | ||
| # stop work processes | ||
| for p in pool: | ||
| inqueue.put(None) | ||
| debug('result handler exiting: len(cache)=%s, thread._state=%s', | ||
| len(cache), thread._state) | ||
| # join all the threads and processes | ||
| task_handler.join() | ||
| result_handler.join() | ||
| for p in pool: | ||
| p.join() | ||
| _finalize_pool = staticmethod(_finalize_pool) | ||
| # staticmethod | ||
| @staticmethod | ||
| def _gettasks(func, it, size): | ||
@@ -220,4 +266,49 @@ it = iter(it) | ||
| yield (func, x) | ||
| _gettasks = staticmethod(_gettasks) | ||
| def __reduce__(self): | ||
| raise NotImplemented | ||
| def close(self): | ||
| debug('closing pool') | ||
| self._state = CLOSE | ||
| self._taskqueue.put(None) | ||
| def terminate(self): | ||
| debug('terminating pool') | ||
| self._state = TERMINATE | ||
| self._terminate() | ||
| def join(self): | ||
| debug('joining pool') | ||
| assert self._state in (CLOSE, TERMINATE) | ||
| self._task_handler.join() | ||
| self._result_handler.join() | ||
| for p in self._pool: | ||
| p.join() | ||
| shutdown = terminate # depracated alias | ||
| @staticmethod | ||
| def _terminate_pool(taskqueue, outqueue, cache, pool, | ||
| task_handler, result_handler): | ||
| debug('finalizing pool') | ||
| if not result_handler.isAlive(): | ||
| debug('result handler already finished -- no need to terminate') | ||
| return | ||
| cache = {} | ||
| task_handler._state = TERMINATE | ||
| result_handler._state = TERMINATE | ||
| taskqueue.put(None) | ||
| outqueue.put(None) | ||
| debug('terminating pool workers') | ||
| for p in pool: | ||
| if p.isAlive(): | ||
| p.terminate() | ||
| task_handler.join() | ||
| result_handler.join() | ||
| # | ||
@@ -224,0 +315,0 @@ # Class whose instances are returned by `Pool.apply_async()` |
+132
-82
@@ -11,3 +11,3 @@ # | ||
| 'Process', 'currentProcess', 'activeChildren', 'freezeSupport', | ||
| 'ProcessExit', 'Finalize', 'getLogger', 'enableLogging' | ||
| 'ProcessExit', 'Finalize', 'getLogger', 'enableLogging', 'note' | ||
| ] | ||
@@ -19,3 +19,12 @@ | ||
| import os, sys, signal, subprocess, time, atexit, weakref, random, itertools | ||
| import os | ||
| import sys | ||
| import signal | ||
| import subprocess | ||
| import time | ||
| import atexit | ||
| import weakref | ||
| import random | ||
| import itertools | ||
| import copy_reg | ||
| import encodings.hex_codec # hint to freeze tools that we need hex codec | ||
@@ -266,3 +275,4 @@ | ||
| return '<%s(%s, %s)>' % (type(self).__name__, self._name, status) | ||
| return '<%s(%s, %s%s)>' % (type(self).__name__, self._name, | ||
| status, self._daemonic and ' daemon' or '') | ||
@@ -282,4 +292,4 @@ ## | ||
| _current_process = self | ||
| info('process starting up') | ||
| try: | ||
| info('child process calling self.run()') | ||
| self.run() | ||
@@ -371,3 +381,3 @@ exitcode = 0 | ||
| ''' | ||
| log(5, 'dummy reset all proxies') | ||
| subdebug('dummy reset all proxies') | ||
@@ -523,58 +533,15 @@ # | ||
| # | ||
| # Support for finalization of objects using weakrefs | ||
| # Make instance and class methods picklable | ||
| # | ||
| class Finalize(object): | ||
| ''' | ||
| Register a callback to be run once before 'obj' is garbage collected. | ||
| If 'obj' is not collected by time of process termination then the | ||
| callback will be run if 'atexit' parameter of constructor was true. | ||
| ''' | ||
| _registry = {} | ||
| _getindex = itertools.count().next | ||
| def __init__(self, obj, callback, args=(), atexit=False): | ||
| self._weakref = weakref.ref(obj, self) | ||
| self._callback = callback | ||
| self._args = args | ||
| self._atexit = atexit | ||
| self._index = Finalize._getindex() | ||
| Finalize._registry[self] = None | ||
| def __call__(self, ignore=None): | ||
| ''' | ||
| Run the callback if it has not already been run | ||
| ''' | ||
| log(5, 'calling %s from finalizer', self._callback) | ||
| try: | ||
| del Finalize._registry[self] | ||
| except KeyError: | ||
| pass | ||
| else: | ||
| self._callback(*self._args) | ||
| self._weakref = self._callback = self._args = None | ||
| _MethodType = type(Process.start) | ||
| def __repr__(self): | ||
| return '<Finalizer(callback=%r, args=%r, atexit=%r)>' % \ | ||
| (self._callback, self._args, self._atexit) | ||
| def _reduce_method(m): | ||
| if m.im_self is None: | ||
| return getattr, (m.im_class, m.im_func.func_name) | ||
| else: | ||
| return getattr, (m.im_self, m.im_func.func_name) | ||
| # staticmethod | ||
| def _run_all_finalizers(): | ||
| ''' | ||
| Run remaining callbacks (in reverse order of registration) | ||
| ''' | ||
| L = Finalize._registry.keys() | ||
| L.sort(cmp=lambda x,y: -cmp(x._index, y._index)) | ||
| copy_reg.pickle(_MethodType, _reduce_method) | ||
| for finalizer in L: | ||
| if finalizer._atexit: | ||
| try: | ||
| finalizer() | ||
| except Exception: | ||
| import traceback | ||
| traceback.print_exc() | ||
| _run_all_finalizers = staticmethod(_run_all_finalizers) | ||
| # | ||
@@ -586,2 +553,6 @@ # Logging | ||
| def subdebug(msg, *args): | ||
| if _logger: | ||
| _logger.subdebug(msg, *args) | ||
| def debug(msg, *args): | ||
@@ -595,5 +566,8 @@ if _logger: | ||
| def log(level, msg, *args): | ||
| def note(msg, *args): | ||
| if _logger: | ||
| _logger.log(level, msg, *args) | ||
| _logger.note(msg, *args) | ||
| else: | ||
| print >>sys.stderr, ('[NOTE/%s] ' + msg) % \ | ||
| ((currentProcess().getName(),)+args) | ||
@@ -618,3 +592,2 @@ def getLogger(): | ||
| _logger.propagate = 0 | ||
| logging.addLevelName(5, 'SUBDEBUG') | ||
@@ -628,2 +601,7 @@ # we want `_logger` to support the "%(processName)s" format | ||
| logging.addLevelName(5, 'SUBDEBUG') | ||
| logging.addLevelName(31, 'NOTE') | ||
| _logger.subdebug = _MethodType(_logger.log, 5) | ||
| _logger.note = _MethodType(_logger.log, 31) | ||
| # cleanup func of `processing` should run before that of `logging` | ||
@@ -648,2 +626,71 @@ atexit._exithandlers.remove((_exit_func, (), {})) | ||
| # | ||
| # Support for finalization of objects using weakrefs | ||
| # | ||
| class Finalize(object): | ||
| ''' | ||
| Register a callback to be run once before 'obj' is garbage collected. | ||
| If 'obj' is not collected by time of process termination then the | ||
| callback will be run if 'atexit' parameter of constructor was true. | ||
| ''' | ||
| _registry = {} | ||
| _getindex = itertools.count().next | ||
| _exiting = False | ||
| def __init__(self, obj, callback, args=(), priority=0, atexit=False): | ||
| self._weakref = weakref.ref(obj, self) | ||
| self._callback = callback | ||
| self._args = args | ||
| self._priority = priority | ||
| self._atexit = atexit | ||
| self._orderkey = (priority, Finalize._getindex()) | ||
| Finalize._registry[self] = None | ||
| def __call__(self, ignore=None): | ||
| ''' | ||
| Run the callback if it has not already been run | ||
| ''' | ||
| subdebug('calling %s from finalizer %s', self._callback,self._orderkey) | ||
| try: | ||
| del Finalize._registry[self] | ||
| except KeyError: | ||
| subdebug('finalizer %s already called', self._callback) | ||
| else: | ||
| self._callback(*self._args) | ||
| self._weakref = self._callback = self._args = None | ||
| def cancel(self): | ||
| ''' | ||
| Cancel the callback | ||
| ''' | ||
| try: | ||
| del Finalize._registry[self] | ||
| except KeyError: | ||
| pass | ||
| def __repr__(self): | ||
| return '<Finalize(callback=%r, args=%r, atexit=%r)>' % \ | ||
| (self._callback, self._args, self._atexit) | ||
| @staticmethod | ||
| def _run_all_finalizers(): | ||
| ''' | ||
| Run remaining callbacks (in reverse order of registration) | ||
| ''' | ||
| Finalize._exiting = True | ||
| L = Finalize._registry.keys() | ||
| L.sort(key=lambda f: f._orderkey, reverse=True) | ||
| for finalizer in L: | ||
| if finalizer._atexit: | ||
| try: | ||
| finalizer() | ||
| except Exception: | ||
| import traceback | ||
| traceback.print_exc() | ||
| _trylater = [] | ||
| # | ||
| # Clean up on exit | ||
@@ -653,29 +700,32 @@ # | ||
| def _exit_func(): | ||
| try: | ||
| info('running all "atexit" finalizers') | ||
| Finalize._run_all_finalizers() | ||
| finally: | ||
| for p in activeChildren(): | ||
| if p._daemonic and p._stoppable: | ||
| info('calling `stop()` for daemon %s', p.getName()) | ||
| p._popen.stop() | ||
| info('running all "atexit" finalizers') | ||
| Finalize._run_all_finalizers() | ||
| deadline = time.time() + 0.1 | ||
| for p in activeChildren(): | ||
| if p._daemonic and p._stoppable: | ||
| info('calling `stop()` for daemon %s', p.getName()) | ||
| p._popen.stop() | ||
| for p in activeChildren(): | ||
| if p._daemonic and p._stoppable: | ||
| info('calling `join(timeout)` for daemon %s', p.getName()) | ||
| p.join(deadline - time.time()) | ||
| for p in activeChildren(): | ||
| if p._daemonic: | ||
| info('calling `terminate()` for daemon %s', p.getName()) | ||
| p._popen.terminate() | ||
| deadline = time.time() + 0.1 | ||
| for p in activeChildren(): | ||
| if not p._daemonic: | ||
| info('calling `join()` for normal process %s', p.getName()) | ||
| p.join() | ||
| for p in activeChildren(): | ||
| if p._daemonic and p._stoppable: | ||
| info('calling `join(timeout)` for daemon %s', p.getName()) | ||
| p.join(deadline - time.time()) | ||
| for p in activeChildren(): | ||
| if p._daemonic: | ||
| info('calling `terminate()` for daemon %s', p.getName()) | ||
| p._popen.terminate() | ||
| for p in activeChildren(): | ||
| info('calling `join()` for process %s', p.getName()) | ||
| p.join() | ||
| for (func, args) in _trylater: | ||
| try: | ||
| func(*args) | ||
| except: | ||
| info('failed to do "trylater" call: %s', (func, args)) | ||
| atexit.register(_exit_func) |
+3
-0
@@ -30,3 +30,6 @@ =================== | ||
| The project is hosted at | ||
| * http://developer.berlios.de/projects/pyprocessing | ||
| The package can be downloaded from | ||
@@ -33,0 +36,0 @@ |
+22
-21
@@ -17,2 +17,5 @@ # | ||
| # | ||
| # | ||
| # | ||
@@ -23,12 +26,13 @@ if (not hasattr(_processing, 'HandleFromPidHandle') and | ||
| try: | ||
| fromfd = socket.fromfd | ||
| except AttributeError: | ||
| def fromfd(fd, family, type, proto=0): | ||
| s = socket._socket.socket() | ||
| _processing.changefd(s, fd, family, type, proto) | ||
| return s | ||
| if sys.platform == 'win32': | ||
| def fromfd(fd, family, type, proto=0): | ||
| assert family == socket.AF_INET | ||
| assert type == socket.SOCK_STREAM | ||
| assert proto == 0 | ||
| return _processing.falsesocket(fd) | ||
| closefd = _processing.CloseHandle | ||
| else: | ||
| fromfd = socket.fromfd | ||
| closefd = os.close | ||
@@ -49,3 +53,3 @@ | ||
| def reduce_handle(handle): | ||
| process.log(5, 'reducing handle %d', handle) | ||
| process.subdebug('reducing handle %d', handle) | ||
| return (os.getpid(), handle) | ||
@@ -55,3 +59,3 @@ | ||
| pid, old_handle = reduced_handle | ||
| process.log(5, 'rebuilding handle %d from PID=%d', old_handle, pid) | ||
| process.subdebug('rebuilding handle %d from PID=%d', old_handle, pid) | ||
| return _processing.HandleFromPidHandle(pid, old_handle) | ||
@@ -76,2 +80,3 @@ | ||
| # | ||
| def reduce_pipe_connection(conn): | ||
@@ -138,3 +143,3 @@ return rebuild_pipe_connection, (reduce_handle(conn.fileno()),) | ||
| _fd_cache.add(dup_fd) | ||
| process.log(5, 'reducing fd %d', fd) | ||
| process.subdebug('reducing fd %d', fd) | ||
| return (_fd_listener.address, dup_fd) | ||
@@ -145,3 +150,3 @@ | ||
| address, fd = reduced_handle | ||
| process.log(5, 'rebuilding fd %d', fd) | ||
| process.subdebug('rebuilding fd %d', fd) | ||
| conn = Client(address, authenticate=True) | ||
@@ -191,13 +196,9 @@ conn.send(fd) | ||
| # have to guess family, type, proto | ||
| try: | ||
| address = s.getsockname() | ||
| except AttributeError: | ||
| Family, Type, Proto = socket.AF_INET, socket.SOCK_STREAM, 0 | ||
| address = s.getsockname() | ||
| if type(address) is str: | ||
| Family = socket.AF_UNIX | ||
| else: | ||
| if type(address) is str: | ||
| Family = socket.AF_UNIX | ||
| else: | ||
| Family = socket.AF_INET | ||
| Type = s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) | ||
| Proto = 0 | ||
| Family = socket.AF_INET | ||
| Type = s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) | ||
| Proto = 0 | ||
| reduced_handle = reduce_handle(s.fileno()) | ||
@@ -204,0 +205,0 @@ return rebuild_socket, (reduced_handle, Family, Type, Proto) |
+1
-7
@@ -142,9 +142,3 @@ # | ||
| if sys.version_info >= (2, 4, 0): | ||
| kwds['package_data'] = {'processing': data} | ||
| else: | ||
| # not sure if it will install on Python 2.3 -- probably not | ||
| processing_path = join(dirname(os.__file__), 'site-packages', 'processing') | ||
| kwds['data_files'] = [(join(processing_path, dirname(pat)), glob(pat)) | ||
| for pat in data] | ||
| kwds['package_data'] = {'processing': data} | ||
@@ -151,0 +145,0 @@ # |
+17
-289
@@ -15,2 +15,8 @@ /* | ||
| #define CHECKHANDLE(self) \ | ||
| if (self->handle == INVALID_HANDLE) { \ | ||
| PyErr_SetString(PyExc_OSError, "handle is invalid"); \ | ||
| return NULL; \ | ||
| } | ||
| typedef struct { | ||
@@ -35,7 +41,4 @@ PyObject_HEAD | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
| if (!PyArg_ParseTuple(args, "s#", &buffer, &length)) | ||
@@ -61,6 +64,3 @@ return NULL; | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
@@ -92,6 +92,3 @@ Py_BEGIN_ALLOW_THREADS | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
@@ -133,6 +130,3 @@ if (!PyArg_ParseTuple(args, "w#", &buffer, &length)) | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
@@ -172,6 +166,3 @@ if (!PyArg_ParseTuple(args, "O", &obj)) | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
@@ -205,6 +196,3 @@ Py_BEGIN_ALLOW_THREADS | ||
| { | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
@@ -220,7 +208,4 @@ return Py_BuildValue("i", self->handle); | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| CHECKHANDLE(self); | ||
| if (! PyArg_ParseTuple(args, "|d", &timeout)) | ||
@@ -246,3 +231,5 @@ return NULL; | ||
| if (self->handle != INVALID_HANDLE) { | ||
| Py_BEGIN_ALLOW_THREADS | ||
| _close(self->handle); | ||
| Py_END_ALLOW_THREADS | ||
| self->handle = INVALID_HANDLE; | ||
@@ -360,261 +347,2 @@ } | ||
| #ifdef FALSE_SOCKET | ||
| /* | ||
| * To support sharing of socket objects we need to be able to build a | ||
| * socket object from a handle/fd. Unfortunately Windows lacks the | ||
| * function `socket.fromfd()` so we define a false socket type. | ||
| */ | ||
| PyTypeObject FalseSocketType; | ||
| static PyObject * | ||
| socket_send(Connection* self, PyObject *args) | ||
| { | ||
| char *buffer = NULL; | ||
| int length, nbytes; | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| if (!PyArg_ParseTuple(args, "s#", &buffer, &length)) | ||
| return NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| nbytes = _write(self->handle, buffer, length); | ||
| Py_END_ALLOW_THREADS | ||
| if (nbytes < 0) | ||
| return SetExcFromNumber(nbytes); | ||
| return Py_BuildValue("i", nbytes); | ||
| } | ||
| static PyObject * | ||
| socket_recv(Connection* self, PyObject *args) | ||
| { | ||
| char *buffer = NULL; | ||
| int nbytes; | ||
| size_t length; | ||
| PyObject *result; | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| if (!PyArg_ParseTuple(args, "I", &length)) | ||
| return NULL; | ||
| if (length == 0) { | ||
| PyErr_SetString(PyExc_AssertionError, "length should be > 0"); | ||
| return NULL; | ||
| } | ||
| buffer = malloc(length); | ||
| if (buffer == NULL) | ||
| return PyErr_NoMemory(); | ||
| Py_BEGIN_ALLOW_THREADS | ||
| nbytes = _read(self->handle, buffer, length); | ||
| Py_END_ALLOW_THREADS | ||
| if (nbytes < 0) { | ||
| free(buffer); | ||
| return SetExcFromNumber(nbytes); | ||
| } | ||
| result = Py_BuildValue("s#", buffer, nbytes); | ||
| free(buffer); | ||
| return result; | ||
| } | ||
| static PyObject * | ||
| socket_sendall(Connection* self, PyObject *args) | ||
| { | ||
| char *buffer = NULL; | ||
| int length, res; | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| if (!PyArg_ParseTuple(args, "s#", &buffer, &length)) | ||
| return NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = _sendall(self->handle, buffer, length); | ||
| Py_END_ALLOW_THREADS | ||
| if (res < 0) | ||
| return SetExcFromNumber(res); | ||
| Py_RETURN_NONE; | ||
| } | ||
| static PyObject * | ||
| socket_accept(Connection* self) | ||
| { | ||
| SOCKET handle = INVALID_SOCKET; | ||
| struct sockaddr_in addr; | ||
| int addrlen; | ||
| PyObject *sock = NULL, *args = NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| handle = accept(self->handle, (struct sockaddr *)&addr, &addrlen); | ||
| Py_END_ALLOW_THREADS | ||
| if (handle == INVALID_SOCKET) { | ||
| SetExcFromNumber(SOME_SOCKET_ERROR); | ||
| goto ERR; | ||
| } | ||
| args = Py_BuildValue("(i)", handle); | ||
| if (!(sock = Connection_new(&FalseSocketType, args))) | ||
| goto ERR; | ||
| closesocket(handle); | ||
| Py_DECREF(args); | ||
| return Py_BuildValue("(O(si))", sock, inet_ntoa(addr.sin_addr), | ||
| ntohs(addr.sin_port)); | ||
| ERR: | ||
| if (handle != INVALID_SOCKET) | ||
| closesocket(handle); | ||
| Py_XDECREF(sock); | ||
| Py_XDECREF(args); | ||
| return NULL; | ||
| } | ||
| static PyObject * | ||
| socket_shutdown(Connection* self, PyObject *args) | ||
| { | ||
| int how, res; | ||
| if (self->handle == INVALID_HANDLE) { | ||
| PyErr_SetString(PyExc_AssertionError, "handle is invalid"); | ||
| return NULL; | ||
| } | ||
| if (!PyArg_ParseTuple(args, "i", &how)) | ||
| return NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = shutdown(self->handle, how); | ||
| Py_END_ALLOW_THREADS | ||
| if (res != 0) | ||
| return SetExcFromNumber(SOME_SOCKET_ERROR); | ||
| Py_RETURN_NONE; | ||
| } | ||
| static PyObject * | ||
| socket_close(Connection* self) | ||
| { | ||
| if (self->handle != INVALID_HANDLE) { | ||
| closesocket(self->handle); | ||
| self->handle = INVALID_HANDLE; | ||
| } | ||
| Py_RETURN_NONE; | ||
| } | ||
| static void | ||
| socket_dealloc(Connection* self) | ||
| { | ||
| Py_XDECREF(socket_close(self)); | ||
| self->ob_type->tp_free((PyObject*)self); | ||
| } | ||
| static PyObject * | ||
| socket_notimplemented(Connection* self, PyObject *args) | ||
| { | ||
| PyErr_SetString(PyExc_NotImplementedError, "not a real socket"); | ||
| return NULL; | ||
| } | ||
| static PyMethodDef socket_methods[] = { | ||
| {"close", (PyCFunction)socket_close, METH_NOARGS, | ||
| "close the connection"}, | ||
| {"fileno", (PyCFunction)Connection_fileno, METH_NOARGS, | ||
| "file descriptor or handle of the connection"}, | ||
| {"recv", (PyCFunction)socket_recv, METH_VARARGS, | ||
| "receive"}, | ||
| {"send", (PyCFunction)socket_send, METH_VARARGS, | ||
| "send"}, | ||
| {"sendall", (PyCFunction)socket_sendall, METH_VARARGS, | ||
| "sendall"}, | ||
| {"accept", (PyCFunction)socket_accept, METH_NOARGS, | ||
| "accept"}, | ||
| {"shutdown", (PyCFunction)socket_shutdown, METH_VARARGS, | ||
| "shutdown"}, | ||
| {"recv_into", (PyCFunction)socket_notimplemented, METH_VARARGS, | ||
| "not implemented"}, | ||
| {"sendto", (PyCFunction)socket_notimplemented, METH_VARARGS, | ||
| "not implemented"}, | ||
| {"recvfrom", (PyCFunction)socket_notimplemented, METH_VARARGS, | ||
| "not implemented"}, | ||
| {"recvfrom_into", (PyCFunction)socket_notimplemented, METH_VARARGS, | ||
| "not implemented"}, | ||
| {NULL} /* Sentinel */ | ||
| }; | ||
| PyTypeObject FalseSocketType = { | ||
| PyObject_HEAD_INIT(NULL) | ||
| 0, /*ob_size*/ | ||
| "_processing.falsesocket", /*tp_name*/ | ||
| sizeof(Connection), /*tp_basicsize*/ | ||
| 0, /*tp_itemsize*/ | ||
| (destructor)socket_dealloc, | ||
| /*tp_dealloc*/ | ||
| 0, /*tp_print*/ | ||
| 0, /*tp_getattr*/ | ||
| 0, /*tp_setattr*/ | ||
| 0, /*tp_compare*/ | ||
| 0, /*tp_repr*/ | ||
| 0, /*tp_as_number*/ | ||
| 0, /*tp_as_sequence*/ | ||
| 0, /*tp_as_mapping*/ | ||
| 0, /*tp_hash*/ | ||
| 0, /*tp_call*/ | ||
| 0, /*tp_str*/ | ||
| 0, /*tp_getattro*/ | ||
| 0, /*tp_setattro*/ | ||
| 0, /*tp_as_buffer*/ | ||
| Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, | ||
| /*tp_flags*/ | ||
| "Pretend socket type used because Windows lacks socket.fromfd().\n" | ||
| "The constructor takes an fd/handle as its argument.\n" | ||
| "The instance uses a duplicated copy of the fd/handle.", | ||
| /*tp_doc*/ | ||
| 0, /*tp_traverse*/ | ||
| 0, /*tp_clear*/ | ||
| 0, /*tp_richcompare*/ | ||
| 0, /*tp_weaklistoffset*/ | ||
| 0, /*tp_iter*/ | ||
| 0, /*tp_iternext*/ | ||
| socket_methods, /*tp_methods*/ | ||
| 0, /*tp_members*/ | ||
| 0, /*tp_getset*/ | ||
| 0, /*tp_base*/ | ||
| 0, /*tp_dict*/ | ||
| 0, /*tp_descr_get*/ | ||
| 0, /*tp_descr_set*/ | ||
| 0, /*tp_dictoffset*/ | ||
| 0, /*tp_init*/ | ||
| 0, /*tp_alloc*/ | ||
| (newfunc)Connection_new, /*tp_new*/ | ||
| }; | ||
| #endif /* FALSE_SOCKET */ | ||
| #endif /* _CONNECTION_H */ |
@@ -230,3 +230,4 @@ #include "Python.h" | ||
| if (length > self->msgsize) { | ||
| PyErr_SetString(PyExc_ValueError, "pickled string too long"); | ||
| PyErr_SetString(PyExc_ValueError, "pickled string too long for a posix" | ||
| " queue - try a pipe queue instead"); | ||
| goto ERR; | ||
@@ -233,0 +234,0 @@ } |
@@ -27,1 +27,47 @@ /* | ||
| #ifdef MS_WINDOWS | ||
| typedef struct { | ||
| PyObject_HEAD | ||
| SOCKET sock_fd; | ||
| int sock_family; | ||
| int sock_type; | ||
| int sock_proto; | ||
| PyObject *(*errorhandler)(void); | ||
| double sock_timeout; | ||
| } PySocketSockObject; | ||
| extern PyTypeObject *socketType; | ||
| PyObject * | ||
| socket_changefd(PyObject *self, PyObject *args) | ||
| { | ||
| PySocketSockObject *s; | ||
| int family, type, proto=0; | ||
| SOCKET fd, newfd; | ||
| /* should probably use "n" format for fd on Python 2.5 */ | ||
| if (!PyArg_ParseTuple(args, "Oiii|i", &s, &fd, &family, &type, &proto)) | ||
| return NULL; | ||
| newfd = _duplicate(fd); | ||
| if (newfd == INVALID_SOCKET) { | ||
| PyErr_SetString(PyExc_OSError, "failed to duplicate socket handle"); | ||
| return NULL; | ||
| } | ||
| if (s->sock_fd != INVALID_SOCKET) { | ||
| Py_BEGIN_ALLOW_THREADS | ||
| closesocket(s->sock_fd); | ||
| Py_END_ALLOW_THREADS | ||
| } | ||
| s->sock_fd = newfd; | ||
| s->sock_family = family; | ||
| s->sock_type = type; | ||
| s->sock_proto = proto; | ||
| Py_RETURN_NONE; | ||
| } | ||
| #endif |
+15
-7
@@ -23,6 +23,7 @@ /* | ||
| extern PyTypeObject SocketConnectionType; | ||
| extern PyTypeObject FalseSocketType; | ||
| PyObject *dumpsFunction, *loadsFunction, *BufferTooShort; | ||
| extern PyObject *socket_changefd(PyObject *self, PyObject *args); | ||
| PyObject *dumpsFunction, *loadsFunction, *BufferTooShort, *socketType; | ||
| /* | ||
@@ -260,2 +261,4 @@ * Definitions to create and manipulate pipe handles | ||
| {"changefd", (PyCFunction)socket_changefd, METH_VARARGS, ""}, | ||
| {NULL} /* Sentinel */ | ||
@@ -285,2 +288,12 @@ }; | ||
| Py_XDECREF(other_module); | ||
| /* | ||
| * Get copy of `_socket.socket` | ||
| */ | ||
| other_module = PyImport_ImportModule("_socket"); | ||
| if (!other_module) | ||
| return; | ||
| socketType = PyObject_GetAttrString(other_module, "socket"); | ||
| Py_XDECREF(other_module); | ||
@@ -310,7 +323,2 @@ /* | ||
| if (PyType_Ready(&FalseSocketType) < 0) | ||
| return; | ||
| Py_INCREF(&FalseSocketType); | ||
| PyModule_AddObject(m, "falsesocket", (PyObject*)&FalseSocketType); | ||
| if (PyType_Ready(&BlockerType) < 0) | ||
@@ -317,0 +325,0 @@ return; |
+50
-371
| # | ||
| # Module implementing synchronization primitives and queues. | ||
| # Module implementing synchronization primitives | ||
| # | ||
@@ -8,7 +8,5 @@ # processing/synchronize.py | ||
| # | ||
| # The C extension `_processing` will be used if available. | ||
| # | ||
| __all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', | ||
| 'Event', 'PipeQueue', 'BufferedPipeQueue' ] | ||
| 'Event'] | ||
@@ -18,8 +16,6 @@ import threading | ||
| import sys | ||
| import process | ||
| import _processing | ||
| import collections | ||
| import itertools | ||
| from Queue import Full, Empty | ||
| from processing import process, _processing | ||
| from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize | ||
@@ -49,22 +45,33 @@ from time import time as _time, sleep as _sleep | ||
| def __init__(self, kind, value, name): | ||
| def __init__(self, kind, value): | ||
| counter = _nextid() | ||
| name = '/pys-%s-%s' % (os.getpid(), counter) | ||
| self._block = _processing.Blocker( | ||
| name=name, create=True, kind=kind, value=value | ||
| ) | ||
| process.debug('creating blocker with name %r' % name) | ||
| if name is None: | ||
| name = '/pys-%s-%s' % (os.getpid(), counter) | ||
| self._block = _processing.Blocker( | ||
| name=name, create=True, kind=kind, value=value | ||
| ) | ||
| process.debug('Creating blocker with name %r' % name) | ||
| if sys.platform != 'win32': | ||
| # On Unix we immediately unlink the name of the | ||
| # semaphore since otherwise the semaphore might not | ||
| # get removed (till the next reboot) if python gets | ||
| # killed. This means that `Blocker` objects are not | ||
| # picklable on Unix, but that does not prevent a child | ||
| # process from using a `Blocker` object inherited from | ||
| # its parent. | ||
| self._block._unlink() | ||
| if sys.platform != 'win32': | ||
| # On Unix we immediately unlink the name of the | ||
| # semaphore since otherwise the semaphore might not | ||
| # get removed (till the next reboot) if python gets | ||
| # killed. This means that `Blocker` objects are not | ||
| # picklable on Unix, but that does not prevent a child | ||
| # process from using a `Blocker` object inherited from | ||
| # its parent. | ||
| self._block._unlink() | ||
| else: | ||
| state = (kind, value, name) | ||
| self.__setstate__(state) | ||
| def __getstate__(self): | ||
| if sys.platform != 'win32': | ||
| raise NotImplemented | ||
| return self._state | ||
| def __setstate__(self, state): | ||
| (kind, value, name) = self._state = state | ||
| self._initvalue = value | ||
| if not hasattr(self, '_block'): | ||
| process.debug('opening blocker with name %r' % name) | ||
| self._block = _processing.Blocker( | ||
@@ -74,9 +81,2 @@ name=name, create=False, kind=kind, value=value | ||
| self._name = name | ||
| if kind == BOUNDED_SEMAPHORE: | ||
| self._maxvalue = value | ||
| else: | ||
| self._maxvalue = -1 | ||
| self._kind = kind | ||
| if kind in (MUTEX, RECURSIVE_MUTEX) and sys.platform != 'win32': | ||
@@ -99,5 +99,2 @@ # On Unix a semaphore masquerading as a mutex will not be | ||
| def __reduce__(self): | ||
| raise NotImplementedError | ||
| # | ||
@@ -109,4 +106,4 @@ # Semaphore | ||
| def __init__(self, value=1, _name=None): | ||
| Blocker.__init__(self, SEMAPHORE, value, _name) | ||
| def __init__(self, value=1): | ||
| Blocker.__init__(self, SEMAPHORE, value) | ||
@@ -124,7 +121,2 @@ def getValue(self): | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return (type(self), (-1, self._name)) | ||
| # | ||
@@ -136,4 +128,4 @@ # Bounded semaphore | ||
| def __init__(self, value=1, _name=None): | ||
| Blocker.__init__(self, BOUNDED_SEMAPHORE, value, _name) | ||
| def __init__(self, value=1): | ||
| Blocker.__init__(self, BOUNDED_SEMAPHORE, value) | ||
@@ -143,3 +135,3 @@ def __repr__(self): | ||
| return '<BoundedSemaphore(value=%r, maxvalue=%r)>' % \ | ||
| (self._block._getvalue(), self._maxvalue) | ||
| (self._block._getvalue(), self._initvalue) | ||
| except (KeyboardInterrupt, SystemExit): | ||
@@ -150,7 +142,2 @@ raise | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return (type(self), (self._maxvalue, self._name)) | ||
| # | ||
@@ -162,4 +149,4 @@ # Non-recursive lock -- releasing an unowned lock raises AssertionError | ||
| def __init__(self, _name=None): | ||
| Blocker.__init__(self, MUTEX, 1, _name) | ||
| def __init__(self): | ||
| Blocker.__init__(self, MUTEX, 1) | ||
@@ -174,7 +161,2 @@ def __repr__(self): | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return (type(self), (self._name,)) | ||
| # | ||
@@ -187,3 +169,3 @@ # Recursive lock | ||
| def __init__(self, _name=None): | ||
| Blocker.__init__(self, RECURSIVE_MUTEX, 1, _name) | ||
| Blocker.__init__(self, RECURSIVE_MUTEX, 1) | ||
@@ -209,7 +191,2 @@ def __repr__(self): | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return (type(self), (self._name,)) | ||
| # | ||
@@ -221,10 +198,15 @@ # Condition variable | ||
| def __init__(self, lock=None, _extra=None): | ||
| self._lock = lock or RLock() | ||
| if _extra is None: | ||
| _extra = (Semaphore(0), Semaphore(0), Semaphore(0)) | ||
| self._sleeping_count, self._woken_count, self._wait_semaphore = _extra | ||
| def __init__(self, lock=None): | ||
| state = (lock or RLock(), Semaphore(0), Semaphore(0), Semaphore(0)) | ||
| self.__setstate__(state) | ||
| def __setstate__(self, state): | ||
| (self._lock, self._sleeping_count, | ||
| self._woken_count, self._wait_semaphore) = self._state = state | ||
| self.acquire = self._lock.acquire | ||
| self.release = self._lock.release | ||
| def __getstate__(self): | ||
| return self._state | ||
| def __repr__(self): | ||
@@ -293,8 +275,2 @@ try: | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return Condition, (self._lock, (self._sleeping_count, | ||
| self._woken_count, self._wait_semaphore)) | ||
| # | ||
@@ -338,298 +314,1 @@ # Event | ||
| # | ||
| # A Queue bases on a pipe | ||
| # | ||
| class PipeQueue(object): | ||
| def __init__(self, maxsize=0, _extra=None): | ||
| self._maxsize = maxsize | ||
| if _extra is None: | ||
| if sys.platform == 'win32': | ||
| from processing.connection import Listener, Client | ||
| l = Listener() | ||
| reader = Client(l.address) | ||
| writer = l.accept() | ||
| else: | ||
| rfd, wfd = os.pipe() | ||
| reader = _processing.SocketConnection(rfd) | ||
| writer = _processing.SocketConnection(wfd) | ||
| rlock = RLock() | ||
| wlock = RLock() | ||
| if maxsize <= 0: | ||
| wsem = None | ||
| else: | ||
| wsem = BoundedSemaphore(maxsize) | ||
| self._extra = reader, writer, rlock, wlock, wsem | ||
| else: | ||
| self._extra = reader, writer, rlock, wlock, wsem = _extra | ||
| self._wsem = wsem | ||
| self._read_acquire = rlock.acquire | ||
| self._read_acquire_timeout = rlock._block.acquire_timeout | ||
| self._read_release = rlock.release | ||
| self._write_acquire = wlock.acquire | ||
| self._write_acquire_timeout = wlock._block.acquire_timeout | ||
| self._write_release = wlock.release | ||
| self._put = writer.send | ||
| self._get = reader.recv | ||
| self._poll = reader.poll | ||
| def get(self, block=1, timeout=None): | ||
| if block == 1 and timeout is None: | ||
| self._read_acquire() | ||
| try: | ||
| res = self._get() | ||
| if self._wsem: | ||
| self._wsem.release() | ||
| return res | ||
| finally: | ||
| self._read_release() | ||
| else: | ||
| if block == 0: | ||
| timeout = 0.0 | ||
| else: | ||
| timeout = max(0.0, timeout) | ||
| deadline = _time() + timeout | ||
| if not self._read_acquire_timeout(timeout): | ||
| raise Empty | ||
| try: | ||
| timeout = max(0.0, deadline - _time()) | ||
| if not self._poll(timeout): | ||
| raise Empty | ||
| res = self._get() | ||
| if self._wsem: | ||
| self._wsem.release() | ||
| return res | ||
| finally: | ||
| self._read_release() | ||
| def put(self, obj, block=1, timeout=None): | ||
| if block == 1 and timeout is None: | ||
| self._write_acquire() | ||
| try: | ||
| if self._wsem: | ||
| self._wsem.acquire() | ||
| self._put(obj) | ||
| finally: | ||
| self._write_release() | ||
| else: | ||
| if block == 0: | ||
| timeout = 0.0 | ||
| else: | ||
| timeout = max(0.0, timeout) | ||
| deadline = _time() + timeout | ||
| if not self._write_acquire_timeout(timeout): | ||
| raise Full | ||
| try: | ||
| if self._wsem: | ||
| timeout = max(0.0, deadline - _time()) | ||
| if not self._wsem._block.acquire_timeout(timeout): | ||
| raise Full | ||
| self._put(obj) | ||
| finally: | ||
| self._write_release() | ||
| def qsize(self): | ||
| raise NotImplementedError | ||
| def empty(self): | ||
| return not self._poll() | ||
| def full(self): | ||
| return bool(self._wsem) and self._wsem._block._getvalue() == 0 | ||
| def get_nowait(self, obj): | ||
| return self.get(obj, False) | ||
| def put_nowait(self, obj): | ||
| return self.put(obj, False) | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return type(self), (self._maxsize, self._extra) | ||
| Queue = PipeQueue | ||
| # | ||
| # Queue based on a posix message queue | ||
| # | ||
| if hasattr(_processing, 'Queue'): | ||
| class PosixQueue(_processing.Queue): | ||
| _count = 0 | ||
| _count_lock = threading.RLock() | ||
| _defaults = None | ||
| def __init__(self, maxsize=0, msgsize=0, _name=None): | ||
| assert maxsize >= 0 and msgsize >= 0 | ||
| PosixQueue._count_lock.acquire() | ||
| try: | ||
| PosixQueue._count += 1 | ||
| count = PosixQueue._count | ||
| finally: | ||
| PosixQueue._count_lock.release() | ||
| if _name is None: | ||
| _name = '/pyq-%s-%s' % (os.getpid(), count) | ||
| if (maxsize, msgsize) != (0, 0): | ||
| defmaxsize, defmsgsize = self.getdefaults() | ||
| maxsize = maxsize or defmaxsize | ||
| msgsize = msgsize or defmsgsize | ||
| _processing.Queue.__init__(self, maxsize, msgsize, _name, True) | ||
| # We immediately unlink the name of the queue since | ||
| # otherwise the queue might not get removed (till the next | ||
| # reboot) if python gets killed. This means that `Queue` | ||
| # objects are not picklable, but that does not prevent a | ||
| # child process from using a `Queue` object inherited from | ||
| # its parent. | ||
| self._unlink() | ||
| def get_nowait(self): | ||
| return self.get(False) | ||
| def put_nowait(self, item): | ||
| return self.put(item, False) | ||
| # staticmethod | ||
| def getdefaults(): | ||
| PosixQueue._count_lock.acquire() | ||
| try: | ||
| if PosixQueue._defaults is None: | ||
| temp = PosixQueue() | ||
| PosixQueue._defaults = (temp._maxmsg, temp._msgsize) | ||
| temp._close() | ||
| return PosixQueue._defaults | ||
| finally: | ||
| PosixQueue._count_lock.release() | ||
| getdefaults = staticmethod(getdefaults) | ||
| Queue = PosixQueue | ||
| __all__ += ['PosixQueue', 'BufferedPosixQueue'] | ||
| # | ||
| # Buffered versions of queues | ||
| # | ||
| _sentinel = object() | ||
| class _BufferedQueue(object): | ||
| def __init__(self, _queue, **kwds): | ||
| self._queue = _queue | ||
| self._notempty = threading.Condition(threading.Lock()) | ||
| self._buffer = collections.deque() | ||
| self._thread = None | ||
| for attr in ('get', 'empty', 'full', 'qsize', | ||
| 'put_nowait', 'get_nowait'): | ||
| method = getattr(self._queue, attr) | ||
| setattr(self, attr, method) | ||
| def put(self, obj): | ||
| self._buffer.append(obj) | ||
| self._notempty.acquire() | ||
| try: | ||
| if self._thread is None: | ||
| self._startthread() | ||
| self._notempty.notify() | ||
| finally: | ||
| self._notempty.release() | ||
| def putmany(self, iterable): | ||
| self._buffer.extend(iterable) | ||
| self._notempty.acquire() | ||
| try: | ||
| if self._thread is None: | ||
| self._startthread() | ||
| self._notempty.notify() | ||
| finally: | ||
| self._notempty.release() | ||
| def close(self): # will be overwritten by _startthread() | ||
| pass | ||
| def _startthread(self): | ||
| self._thread = threading.Thread( | ||
| target=_BufferedQueue._feed, | ||
| args=[self._buffer, self._notempty, self._queue.put] | ||
| ) | ||
| self.close = process.Finalize( | ||
| self, _BufferedQueue._finalize_queue, | ||
| [self._buffer, self._notempty, self._thread], | ||
| atexit=True | ||
| ) | ||
| # We make the thread daemonic only to prevent the exit handler | ||
| # of `threading` from trying to join it before the finalizer | ||
| # `self.close()` gets a chance to stop and join it. | ||
| self._thread.setDaemon(True) | ||
| self._thread.start() | ||
| # staticmethod | ||
| def _finalize_queue(buffer, notempty, thread): | ||
| process.debug('closing thread used by buffered queue') | ||
| notempty.acquire() | ||
| try: | ||
| buffer.append(_sentinel) | ||
| notempty.notify() | ||
| finally: | ||
| notempty.release() | ||
| thread.join() | ||
| _finalize_queue = staticmethod(_finalize_queue) | ||
| # staticmethod | ||
| def _feed(buffer, notempty, put): | ||
| while 1: | ||
| notempty.acquire() | ||
| try: | ||
| if not buffer: | ||
| notempty.wait() | ||
| finally: | ||
| notempty.release() | ||
| try: | ||
| while 1: | ||
| obj = buffer.popleft() | ||
| if obj is _sentinel: | ||
| return | ||
| put(obj) | ||
| except IndexError: | ||
| pass | ||
| _feed = staticmethod(_feed) | ||
| class BufferedPipeQueue(_BufferedQueue): | ||
| def __init__(self, maxsize=0, _queue=None): | ||
| self._queue = _queue or PipeQueue(maxsize) | ||
| self._maxsize = maxsize | ||
| _BufferedQueue.__init__(self, self._queue) | ||
| def __reduce__(self): | ||
| return (BufferedPipeQueue, (self._maxsize, self._queue)) | ||
| if hasattr(_processing, 'Queue'): | ||
| class BufferedPosixQueue(_BufferedQueue): | ||
| def __init__(self, maxsize=0, msgsize=0): | ||
| _BufferedQueue.__init__(self, PosixQueue(maxsize, msgsize)) | ||
@@ -24,3 +24,3 @@ # | ||
| def note(format, *args): | ||
| sys.stderr.write('[%s]\t%s\n' % (currentProcess()._name, format % args)) | ||
| sys.stderr.write('[%s]\t%s\n' % (currentProcess().getName(), format%args)) | ||
@@ -27,0 +27,0 @@ |
@@ -55,10 +55,2 @@ # | ||
| class A(object): | ||
| # staticmethod | ||
| def f(): | ||
| return 'A.f()' | ||
| f = staticmethod(f) | ||
| ## | ||
| def test(): | ||
@@ -65,0 +57,0 @@ manager = MyManager() |
+73
-14
@@ -94,9 +94,16 @@ # | ||
| A = map(pow3, xrange(N)) | ||
| print '\tmap(pow3, xrange(%d)):\t%s seconds' % (N, time.time() - t) | ||
| print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ | ||
| (N, time.time() - t) | ||
| t = time.time() | ||
| B = pool.map(pow3, xrange(N)) | ||
| print '\tpool.map(pow3, xrange(%d)):\t%s seconds' % (N, time.time() - t) | ||
| print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ | ||
| (N, time.time() - t) | ||
| t = time.time() | ||
| C = list(pool.imap(pow3, xrange(N), chunksize=10000)) | ||
| print '\tlist(pool.imap(pow3, xrange(%d), chunksize=10000)):\n\t\t%s' \ | ||
| ' seconds' % (N, time.time() - t) | ||
| assert A == B, (len(A), len(B)) | ||
| assert A == B == C, (len(A), len(B), len(C)) | ||
@@ -110,12 +117,19 @@ | ||
| A = map(noop, L) | ||
| print '\tmap(noop, L):\t\t\t%s seconds' % (time.time() - t) | ||
| print '\tmap(noop, L):\n\t\t%s seconds' % \ | ||
| (time.time() - t) | ||
| t = time.time() | ||
| B = pool.map(noop, L) | ||
| print '\tpool.map(noop, L):\t\t%s seconds' % (time.time() - t) | ||
| print '\tpool.map(noop, L):\n\t\t%s seconds' % \ | ||
| (time.time() - t) | ||
| assert A == B, (len(A), len(B)) | ||
| t = time.time() | ||
| C = list(pool.imap(noop, L, chunksize=10000)) | ||
| print '\tlist(pool.imap(noop, L, chunksize=10000)):\n\t\t%s seconds' % \ | ||
| (time.time() - t) | ||
| assert A == B == C, (len(A), len(B), len(C)) | ||
| del A, B, L | ||
| del A, B, C, L | ||
@@ -211,5 +225,5 @@ # | ||
| if A == B: | ||
| print '\tcallbacks succeeded' | ||
| print '\tcallbacks succeeded\n' | ||
| else: | ||
| print '\t*** callbacks failed\n\t\t%s != %s' % (A, B) | ||
| print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) | ||
@@ -223,18 +237,63 @@ # | ||
| # | ||
| # Check that processes get stopped when pool is deleted | ||
| # Check close() methods | ||
| # | ||
| processes = pool._pool | ||
| print 'Testing close():' | ||
| for worker in processes: | ||
| for worker in pool._pool: | ||
| assert worker.isAlive() | ||
| del pool | ||
| result = pool.apply_async(time.sleep, [0.5]) | ||
| pool.close() | ||
| pool.join() | ||
| assert result.get() is None | ||
| for worker in pool._pool: | ||
| assert not worker.isAlive() | ||
| print '\tclose() succeeded\n' | ||
| # | ||
| # Check terminate() method | ||
| # | ||
| print 'Testing terminate():' | ||
| pool = Pool(2) | ||
| ignore = pool.apply(pow3, [2]) | ||
| results = [pool.apply_async(time.sleep, [10]) for i in range(10)] | ||
| pool.terminate() | ||
| pool.join() | ||
| for worker in pool._pool: | ||
| assert not worker.isAlive() | ||
| print '\tterminate() succeeded\n' | ||
| # | ||
| # Check garbage collection | ||
| # | ||
| print 'Testing garbage collection:' | ||
| pool = Pool(2) | ||
| processes = pool._pool | ||
| ignore = pool.apply(pow3, [2]) | ||
| results = [pool.apply_async(time.sleep, [10]) for i in range(10)] | ||
| del results, pool | ||
| time.sleep(0.2) | ||
| for worker in processes: | ||
| assert not worker.isAlive() | ||
| print '\tgarbage collection succeeded\n' | ||
| if __name__ == '__main__': | ||
| freezeSupport() | ||
| test() | ||
+9
-12
@@ -187,12 +187,8 @@ # | ||
| threading.Condition()) | ||
| if hasattr(processing, 'PosixQueue'): | ||
| print '\n\t######## testing processing.PosixQueue\n' | ||
| test_queuespeed(processing.Process, processing.PosixQueue(), | ||
| processing.Condition()) | ||
| print '\n\t######## testing processing.BufferedPosixQueue\n' | ||
| test_queuespeed(processing.Process, processing.BufferedPosixQueue(), | ||
| processing.Condition()) | ||
| print '\n\t######## testing processing.PipeQueue\n' | ||
| test_queuespeed(processing.Process, processing.PipeQueue(), | ||
| print '\n\t######## testing processing.Queue\n' | ||
| test_queuespeed(processing.Process, processing.Queue(), | ||
| processing.Condition()) | ||
| print '\n\t######## testing processing.SimpleQueue\n' | ||
| test_queuespeed(processing.Process, processing.SimpleQueue(), | ||
| processing.Condition()) | ||
| print '\n\t######## testing Queue managed by server process\n' | ||
@@ -203,5 +199,6 @@ test_queuespeed(processing.Process, manager.Queue(), | ||
| test_pipespeed() | ||
| print '\n\t######## testing processing.BufferedPipeQueue\n' | ||
| test_queuespeed(processing.Process, processing.BufferedPipeQueue(), | ||
| processing.Condition()) | ||
| if hasattr(processing, 'PosixQueue'): | ||
| print '\n\t######## testing processing.PosixQueue\n' | ||
| test_queuespeed(processing.Process, processing.PosixQueue(), | ||
| processing.Condition()) | ||
@@ -208,0 +205,0 @@ print |
@@ -6,3 +6,3 @@ import time, sys, processing | ||
| for i in range(50): | ||
| for i in range(25): | ||
| try: | ||
@@ -54,3 +54,3 @@ time.sleep(0.1) | ||
| p.start() | ||
| time.sleep(3) | ||
| time.sleep(1.5) | ||
| p.stop() | ||
@@ -63,3 +63,3 @@ p.join() | ||
| p.start() | ||
| time.sleep(3) | ||
| time.sleep(1.5) | ||
| print >>sys.stderr, '\nterminating process' | ||
@@ -66,0 +66,0 @@ p.terminate() |
+11
-20
| # | ||
| # Simple example which uses a pool of workers to carry out some tasks. | ||
| # | ||
| # Note that a queue produced by `processing.Queue()` will have finite | ||
| # capacity meaning that its `put()` method can block. Therefore we | ||
| # use a queue using `processing.BufferedQueue()`. This has the | ||
| # guarantee that the `put()` (or `putmany()`) method will succeed | ||
| # without blocking. Therefore putting all the tasks on this queue | ||
| # will not cause a deadlock, regardless of how many tasks there are, | ||
| # or whether there is another process which is already consuming the | ||
| # tasks. | ||
| # Notice that the results will probably not come out of the output | ||
| # queue in the same in the same order as the corresponding tasks were | ||
| # put on the input queue. If it is important to get the results back | ||
| # in the original order then consider using `Pool.map()` or | ||
| # `Pool.imap()` (which will save on the amount of code needed anyway). | ||
| # | ||
| # Also notice that the results will probably not come out of the | ||
| # output queue in the same in the same order as the corresponding | ||
| # tasks were put on the input queue. If it is important to get the | ||
| # results back in the original order then consider using `Pool.map()` | ||
| # or `Pool.imap()`. | ||
| # | ||
@@ -67,8 +58,7 @@ import time | ||
| # Create queues | ||
| task_queue = BufferedQueue() | ||
| task_queue = Queue() | ||
| done_queue = Queue() | ||
| # Submit tasks -- no deadlock possible since `task_queue` is buffered | ||
| for task in TASKS1: | ||
| task_queue.put(task) | ||
| # Submit tasks | ||
| task_queue.putmany(TASKS1) | ||
@@ -84,4 +74,5 @@ # Start worker processes | ||
| # Add more tasks -- `putmany()` is an alternative to `put()` and a for-loop | ||
| task_queue.putmany(TASKS2) | ||
| # Add more tasks using `put()` instead of `putmany()` | ||
| for task in TASKS2: | ||
| task_queue.put(task) | ||
@@ -88,0 +79,0 @@ # Get and print some more results |
+2
-2
@@ -8,5 +8,5 @@ ======== | ||
| Alexey Akimov, Michele Bertoldi, Josiah Carlson, Lisandro Dalcin, | ||
| Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Kevin | ||
| Manley, Dominique Wahli. | ||
| Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Gerald | ||
| John M. Manipon, Kevin Manley, Paul Rudin, Dominique Wahli. | ||
| Sorry if I have forgotten anyone. |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
518153
0.09%65
1.56%5737
2.81%