processing
Advanced tools
| <?xml version="1.0" encoding="utf-8" ?> | ||
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> | ||
| <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> | ||
| <head> | ||
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | ||
| <meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" /> | ||
| <title>Connection objects</title> | ||
| <link rel="stylesheet" href="html4css1.css" type="text/css" /> | ||
| </head> | ||
| <body> | ||
| <div class="header"> | ||
| <a class="reference" href="queue-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="manager-objects.html">Next</a> | ||
| <hr class="header"/> | ||
| </div> | ||
| <div class="document" id="connection-objects"> | ||
| <h1 class="title">Connection objects</h1> | ||
| <p>Connection objects allow the sending and receiving of picklable | ||
| objects or strings. They can be thought of a message oriented | ||
| connected sockets.</p> | ||
| <p>Connection objects usually created using <tt class="docutils literal"><span class="pre">processing.Pipe()</span></tt> -- see | ||
| also <a class="reference" href="connection-ref.html">Listener and Clients</a>.</p> | ||
| <p>Connection objects have the following methods:</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">send(obj)</span></tt></dt> | ||
| <dd><p class="first">Send an object to the other end of the connection which should | ||
| be read using <tt class="docutils literal"><span class="pre">recv()</span></tt>.</p> | ||
| <p class="last">The object must be picklable.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recv()</span></tt></dt> | ||
| <dd>Return an object sent from the other end of the connection | ||
| using <tt class="docutils literal"><span class="pre">send()</span></tt>.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">fileno()</span></tt></dt> | ||
| <dd>Returns the file descriptor or handle used by the connection.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd><p class="first">Close the connection.</p> | ||
| <p class="last">This is called automatically when the connection is garbage | ||
| collected.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">poll(timeout=0)</span></tt></dt> | ||
| <dd>Return whether there is any data available to be read within | ||
| <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">sendbytes(buffer)</span></tt></dt> | ||
| <dd><p class="first">Send byte data from an object supporting the buffer interface | ||
| as a complete message.</p> | ||
| <p class="last">Can be used to send strings or a view returned by <tt class="docutils literal"><span class="pre">buffer()</span></tt>.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recvbytes()</span></tt></dt> | ||
| <dd>Return a complete message of byte data sent from the other end | ||
| of the connection as a string.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recvbytes_into(buffer,</span> <span class="pre">offset=0)</span></tt></dt> | ||
| <dd><p class="first">Read into <tt class="docutils literal"><span class="pre">buffer</span></tt> at position <tt class="docutils literal"><span class="pre">offset</span></tt> a complete message of | ||
| byte data sent from the other end of the connection and return | ||
| the number of bytes in the message.</p> | ||
| <p><tt class="docutils literal"><span class="pre">buffer</span></tt> must be an object satisfying the writable buffer | ||
| interface and <tt class="docutils literal"><span class="pre">offset</span></tt> must be non-negative and less than | ||
| the length of <tt class="docutils literal"><span class="pre">buffer</span></tt> (in bytes).</p> | ||
| <p class="last">If the buffer is too short then a <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt> exception | ||
| is raised and the complete message of bytes data is available | ||
| as <tt class="docutils literal"><span class="pre">e.args[0]</span></tt> where <tt class="docutils literal"><span class="pre">e</span></tt> is the exception instance.</p> | ||
| </dd> | ||
| </dl> | ||
| </blockquote> | ||
| <p>For example:</p> | ||
| <blockquote> | ||
| <pre class="doctest-block"> | ||
| >>> from processing import Pipe | ||
| >>> a, b = Pipe() | ||
| >>> a.send([1, 'hello', None]) | ||
| >>> b.recv() | ||
| [1, 'hello', None] | ||
| >>> b.sendbytes('thank you') | ||
| >>> a.recvbytes() | ||
| 'thank you' | ||
| >>> import array | ||
| >>> arr1 = array.array('i', range(5)) | ||
| >>> arr2 = array.array('i', [0] * 10) | ||
| >>> a.sendbytes(arr1) | ||
| >>> b.recvbytes_into(arr2) | ||
| 20 | ||
| >>> arr2 | ||
| array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]) | ||
| </pre> | ||
| </blockquote> | ||
| <div class="warning"> | ||
| <p class="first admonition-title">Warning</p> | ||
| <p>The <tt class="docutils literal"><span class="pre">recv()</span></tt> method automatically unpickles the data it receives | ||
| which can be a security risk unless you can trust the process | ||
| which sent the message.</p> | ||
| <p class="last">Therefore, unless the connection object was produced using | ||
| <tt class="docutils literal"><span class="pre">Pipe()</span></tt> you should only use the <tt class="docutils literal"><span class="pre">recv()</span></tt> and <tt class="docutils literal"><span class="pre">send()</span></tt> methods | ||
| after performing some sort of authentication. See <a class="reference" href="connection-ref.html#authentication-keys">Authentication | ||
| keys</a>.</p> | ||
| </div> | ||
| <div class="warning"> | ||
| <p class="first admonition-title">Warning</p> | ||
| <p class="last">If a process is killed while it is trying to read or write to a | ||
| pipe then the data in the pipe is likely to become corrupted | ||
| because it may become impossible to be sure where the message | ||
| boundaries lie.</p> | ||
| </div> | ||
| </div> | ||
| <div class="footer"> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="queue-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="manager-objects.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
| .. include:: header.txt | ||
| ==================== | ||
| Connection objects | ||
| ==================== | ||
| Connection objects allow the sending and receiving of picklable | ||
| objects or strings. They can be thought of a message oriented | ||
| connected sockets. | ||
| Connection objects usually created using `processing.Pipe()` -- see | ||
| also `Listener and Clients <connection-ref.html>`_. | ||
| Connection objects have the following methods: | ||
| `send(obj)` | ||
| Send an object to the other end of the connection which should | ||
| be read using `recv()`. | ||
| The object must be picklable. | ||
| `recv()` | ||
| Return an object sent from the other end of the connection | ||
| using `send()`. | ||
| `fileno()` | ||
| Returns the file descriptor or handle used by the connection. | ||
| `close()` | ||
| Close the connection. | ||
| This is called automatically when the connection is garbage | ||
| collected. | ||
| `poll(timeout=0)` | ||
| Return whether there is any data available to be read within | ||
| `timeout` seconds. | ||
| `sendbytes(buffer)` | ||
| Send byte data from an object supporting the buffer interface | ||
| as a complete message. | ||
| Can be used to send strings or a view returned by `buffer()`. | ||
| `recvbytes()` | ||
| Return a complete message of byte data sent from the other end | ||
| of the connection as a string. | ||
| `recvbytes_into(buffer, offset=0)` | ||
| Read into `buffer` at position `offset` a complete message of | ||
| byte data sent from the other end of the connection and return | ||
| the number of bytes in the message. | ||
| `buffer` must be an object satisfying the writable buffer | ||
| interface and `offset` must be non-negative and less than | ||
| the length of `buffer` (in bytes). | ||
| If the buffer is too short then a `BufferTooShort` exception | ||
| is raised and the complete message of bytes data is available | ||
| as `e.args[0]` where `e` is the exception instance. | ||
| For example: | ||
| >>> from processing import Pipe | ||
| >>> a, b = Pipe() | ||
| >>> a.send([1, 'hello', None]) | ||
| >>> b.recv() | ||
| [1, 'hello', None] | ||
| >>> b.sendbytes('thank you') | ||
| >>> a.recvbytes() | ||
| 'thank you' | ||
| >>> import array | ||
| >>> arr1 = array.array('i', range(5)) | ||
| >>> arr2 = array.array('i', [0] * 10) | ||
| >>> a.sendbytes(arr1) | ||
| >>> b.recvbytes_into(arr2) | ||
| 20 | ||
| >>> arr2 | ||
| array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]) | ||
| .. warning:: | ||
| The `recv()` method automatically unpickles the data it receives | ||
| which can be a security risk unless you can trust the process | ||
| which sent the message. | ||
| Therefore, unless the connection object was produced using | ||
| `Pipe()` you should only use the `recv()` and `send()` methods | ||
| after performing some sort of authentication. See `Authentication | ||
| keys <connection-ref.html#authentication-keys>`_. | ||
| .. warning:: | ||
| If a process is killed while it is trying to read or write to a | ||
| pipe then the data in the pipe is likely to become corrupted | ||
| because it may become impossible to be sure where the message | ||
| boundaries lie. | ||
| .. _Prev: queue-objects.html | ||
| .. _Up: processing-ref.html | ||
| .. _Next: manager-objects.html |
| <?xml version="1.0" encoding="utf-8" ?> | ||
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> | ||
| <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> | ||
| <head> | ||
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | ||
| <meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" /> | ||
| <title>Queue objects</title> | ||
| <link rel="stylesheet" href="html4css1.css" type="text/css" /> | ||
| </head> | ||
| <body> | ||
| <div class="header"> | ||
| <a class="reference" href="process-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-objects.html">Next</a> | ||
| <hr class="header"/> | ||
| </div> | ||
| <div class="document" id="queue-objects"> | ||
| <h1 class="title">Queue objects</h1> | ||
| <p>The queue types provided by <tt class="docutils literal"><span class="pre">processing</span></tt> are multi-producer, | ||
| multi-consumer FIFO queues modelled on the <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> class in the | ||
| standard library.</p> | ||
| <p>In general it is best to stick to <tt class="docutils literal"><span class="pre">processing.Queue</span></tt> which has the | ||
| important advantage that (unless a maximum size is specified) putting | ||
| an object on the queue will always succeed without blocking. Without | ||
| this guarantee one must be much more careful to avoid the possibility | ||
| of deadlocks.</p> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt> | ||
| <dd><p class="first">Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe.</p> | ||
| <p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt> | ||
| method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and | ||
| <tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p> | ||
| <p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods which are usually | ||
| unnecessary:</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt> | ||
| <dd>If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| <tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span> | ||
| <span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite | ||
| size.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd>Indicates that no more data will be put on this queue by | ||
| the current process. The background thread will quit once | ||
| it has flushed all buffered data to the pipe. This is | ||
| called automatically when the queue is garbage collected.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt> | ||
| <dd><p class="first">This joins the background thread and can only be used | ||
| after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe.</p> | ||
| <p class="last">By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| <tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt> | ||
| <dd>Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue.</dd> | ||
| </dl> | ||
| </blockquote> | ||
| <p class="last">Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie).</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt> | ||
| <dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is really | ||
| just a non-duplex pipe protected by a couple of locks.</p> | ||
| <p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have | ||
| <tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt> | ||
| method.</p> | ||
| <p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero) | ||
| using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt> | ||
| <dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix | ||
| systems which support Posix message queues.</p> | ||
| <p>However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| <tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p> | ||
| <p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value | ||
| is used.</p> | ||
| <p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p> | ||
| <p class="last">If one tries to send a message which is too long then | ||
| <tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p> | ||
| </dd> | ||
| </dl> | ||
| <div class="admonition-empty-and-full admonition"> | ||
| <p class="first admonition-title"><tt class="docutils literal"><span class="pre">Empty</span></tt> and <tt class="docutils literal"><span class="pre">Full</span></tt></p> | ||
| <p class="last"><tt class="docutils literal"><span class="pre">processing</span></tt> uses the usual <tt class="docutils literal"><span class="pre">Queue.Empty</span></tt> and <tt class="docutils literal"><span class="pre">Queue.Full</span></tt> | ||
| exceptions to signal a timeout. They are not available in the | ||
| <tt class="docutils literal"><span class="pre">processing</span></tt> namespace so you need to import them from <tt class="docutils literal"><span class="pre">Queue</span></tt>.</p> | ||
| </div> | ||
| <div class="warning"> | ||
| <p class="first admonition-title">Warning</p> | ||
| <p class="last">If a process is killed while it is trying to use a <tt class="docutils literal"><span class="pre">Queue</span></tt> or | ||
| <tt class="docutils literal"><span class="pre">SimpleQueue</span></tt> then the data in the queue is likely to become | ||
| corrupted because it may become impossible to be sure where the | ||
| message boundaries lie. However, <tt class="docutils literal"><span class="pre">PosixQueue</span></tt> does not have this | ||
| problem.</p> | ||
| </div> | ||
| </div> | ||
| <div class="footer"> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="process-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-objects.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
| .. include:: header.txt | ||
| =============== | ||
| Queue objects | ||
| =============== | ||
| The queue types provided by `processing` are multi-producer, | ||
| multi-consumer FIFO queues modelled on the `Queue.Queue` class in the | ||
| standard library. | ||
| In general it is best to stick to `processing.Queue` which has the | ||
| important advantage that (unless a maximum size is specified) putting | ||
| an object on the queue will always succeed without blocking. Without | ||
| this guarantee one must be much more careful to avoid the possibility | ||
| of deadlocks. | ||
| `Queue(maxsize=0)` | ||
| Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe. | ||
| It is a near clone of `Queue.Queue` except that the `qsize()` | ||
| method is not implemented and that the `task_done()` and | ||
| `join()` methods introduced in Python 2.5 are also missing. | ||
| `Queue` has a few additional methods which are usually | ||
| unnecessary: | ||
| `putmany(iterable)` | ||
| If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| `q.putmany(X)` is a faster alternative to `for x in X: | ||
| q.put(x)`. Raises an error if the queue has finite | ||
| size. | ||
| `close()` | ||
| Indicates that no more data will be put on this queue by | ||
| the current process. The background thread will quit once | ||
| it has flushed all buffered data to the pipe. This is | ||
| called automatically when the queue is garbage collected. | ||
| `jointhread()` | ||
| This joins the background thread and can only be used | ||
| after `close()` has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe. | ||
| By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| `canceljoin()` to prevent this behaviour. | ||
| `canceljoin()` | ||
| Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue. | ||
| Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie). | ||
| `SimpleQueue()` | ||
| A simplified and faster alternative to `Queue()`. It is really | ||
| just a non-duplex pipe protected by a couple of locks. | ||
| It has `get()` and `put()` methods but these do not have | ||
| `block` or `timeout` arguments. It also has an `empty()` | ||
| method. | ||
| **Warning**: Unlike with `Queue` (when `maxsize` is zero) | ||
| using `put()` may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible. | ||
| `PosixQueue(maxsize=0, msgsize=0)` | ||
| A faster alternative to `Queue()` which is available on Unix | ||
| systems which support Posix message queues. | ||
| However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| `man 7 mq_overview`. | ||
| `maxsize` if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If `maxsize` is zero then this maximum value | ||
| is used. | ||
| `msgsize` if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If `msgsize` is zero then this maximum value is used. | ||
| If one tries to send a message which is too long then | ||
| `ValueError` will be raised. | ||
| .. admonition:: `Empty` and `Full` | ||
| `processing` uses the usual `Queue.Empty` and `Queue.Full` | ||
| exceptions to signal a timeout. They are not available in the | ||
| `processing` namespace so you need to import them from `Queue`. | ||
| .. warning:: | ||
| If a process is killed while it is trying to use a `Queue` or | ||
| `SimpleQueue` then the data in the queue is likely to become | ||
| corrupted because it may become impossible to be sure where the | ||
| message boundaries lie. However, `PosixQueue` does not have this | ||
| problem. | ||
| .. _Prev: process-objects.html | ||
| .. _Up: processing-ref.html | ||
| .. _Next: connection-objects.html |
| <?xml version="1.0" encoding="utf-8" ?> | ||
| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> | ||
| <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> | ||
| <head> | ||
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> | ||
| <meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" /> | ||
| <title>Shared ctypes objects</title> | ||
| <link rel="stylesheet" href="html4css1.css" type="text/css" /> | ||
| </head> | ||
| <body> | ||
| <div class="header"> | ||
| <a class="reference" href="pool-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-ref.html">Next</a> | ||
| <hr class="header"/> | ||
| </div> | ||
| <div class="document" id="shared-ctypes-objects"> | ||
| <h1 class="title">Shared ctypes objects</h1> | ||
| <p>The <tt class="docutils literal"><span class="pre">processing.sharedctypes</span></tt> module provides functions for allocating | ||
| ctypes objects from shared memory which can be inherited by child | ||
| processes. (See the standard library's documentation for details of | ||
| the <tt class="docutils literal"><span class="pre">ctypes</span></tt> package.)</p> | ||
| <p>Note that access to a ctypes objects is not protected by any lock. | ||
| However accessing a ctypes object can be much faster (20+ times | ||
| faster) than accessing a synchronized shared object allocated using | ||
| <tt class="docutils literal"><span class="pre">LocalManager</span></tt>.</p> | ||
| <p>The functions in the module are</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">new_value(fmt_or_type,</span> <span class="pre">*args)</span></tt></dt> | ||
| <dd><p class="first">Returns a ctypes object allocated from shared memory.</p> | ||
| <p class="last"><tt class="docutils literal"><span class="pre">fmt_or_type</span></tt> determines the type of the returned object: it | ||
| is either a ctypes type or a one character string format of | ||
| the kind used by the <tt class="docutils literal"><span class="pre">array</span></tt> module. The remaining arguments | ||
| are passed on to the constructor for the type.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">new_array(fmt_or_type,</span> <span class="pre">size_or_initializer)</span></tt></dt> | ||
| <dd><p class="first">Returns a ctypes array allocated from shared memory.</p> | ||
| <p class="last"><tt class="docutils literal"><span class="pre">fmt_or_type</span></tt> determines the type of the elements of the | ||
| returned array: it is either a ctypes type or a one character | ||
| string format of the kind used by the <tt class="docutils literal"><span class="pre">array</span></tt> module. If | ||
| <tt class="docutils literal"><span class="pre">size_or_initializer</span></tt> is an integer then it determines the | ||
| length of the array, and the array will be initially zeroed. | ||
| Otherwise <tt class="docutils literal"><span class="pre">size_or_initializer</span></tt> is a sequence which is used to | ||
| initialize the array and whose length determines the length of | ||
| the array.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">copy(obj)</span></tt></dt> | ||
| <dd>Returns a ctypes object allocated from shared memory which is | ||
| a copy of the ctypes object <tt class="docutils literal"><span class="pre">obj</span></tt>.</dd> | ||
| </dl> | ||
| </blockquote> | ||
| <div class="section"> | ||
| <h1><a id="equivalences" name="equivalences">Equivalences</a></h1> | ||
| <p>The table below compares the syntax for creating a shared ctypes | ||
| object from shared memory with the normal ctypes syntax. (In the | ||
| table <tt class="docutils literal"><span class="pre">MyStruct</span></tt> is some subclass of <tt class="docutils literal"><span class="pre">ctypes.Structure</span></tt>.)</p> | ||
| <table border="1" class="docutils"> | ||
| <colgroup> | ||
| <col width="37%" /> | ||
| <col width="36%" /> | ||
| <col width="27%" /> | ||
| </colgroup> | ||
| <thead valign="bottom"> | ||
| <tr><th class="head">sharedctypes using type</th> | ||
| <th class="head">sharedctypes using format</th> | ||
| <th class="head">ctypes</th> | ||
| </tr> | ||
| </thead> | ||
| <tbody valign="top"> | ||
| <tr><td>new_value(c_double, 2.4)</td> | ||
| <td>new_value('d', 2.4)</td> | ||
| <td>c_double(2.4)</td> | ||
| </tr> | ||
| <tr><td>new_value(MyStruct, 4, 6)</td> | ||
| <td> </td> | ||
| <td>MyStruct(4, 6)</td> | ||
| </tr> | ||
| <tr><td>new_array(c_short, 7)</td> | ||
| <td>new_array('h', 7)</td> | ||
| <td>(c_short * 7)()</td> | ||
| </tr> | ||
| <tr><td>new_array(c_int, (9, 2, 8))</td> | ||
| <td>new_array('i', (9, 2, 8))</td> | ||
| <td>(c_int * 3)(9, 2, 8)</td> | ||
| </tr> | ||
| </tbody> | ||
| </table> | ||
| </div> | ||
| <div class="section"> | ||
| <h1><a id="example" name="example">Example</a></h1> | ||
| <p>Below is an example where a number of ctypes objects are modified by a | ||
| child process</p> | ||
| <pre class="literal-block"> | ||
| from processing import Process | ||
| from processing.sharedctypes import new_value, new_array | ||
| from ctypes import Structure, c_double | ||
| class Point(Structure): | ||
| _fields_ = [('x', c_double), ('y', c_double)] | ||
| def modify(n, x, s, A): | ||
| n.value **= 2 | ||
| x.value **= 2 | ||
| s.value = s.value.upper() | ||
| for p in A: | ||
| p.x **= 2 | ||
| p.y **= 2 | ||
| if __name__ == '__main__': | ||
| n = new_value('i', 7) | ||
| x = new_value('d', 1.0/3.0) | ||
| s = new_array('c', 'hello world') | ||
| A = new_array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)]) | ||
| p = Process(target=modify, args=(n, x, s, A)) | ||
| p.start() | ||
| p.join() | ||
| print n | ||
| print x | ||
| print s.value | ||
| print [(p.x, p.y) for p in A] | ||
| </pre> | ||
| <p>The results printed are</p> | ||
| <pre class="literal-block"> | ||
| c_long(49) | ||
| c_double(0.1111111111111111) | ||
| HELLO WORLD | ||
| [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] | ||
| </pre> | ||
| </div> | ||
| </div> | ||
| <div class="footer"> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="pool-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-ref.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
| .. include:: header.txt | ||
| ======================== | ||
| Shared ctypes objects | ||
| ======================== | ||
| The `processing.sharedctypes` module provides functions for allocating | ||
| ctypes objects from shared memory which can be inherited by child | ||
| processes. (See the standard library's documentation for details of | ||
| the `ctypes` package.) | ||
| Note that access to a ctypes objects is not protected by any lock. | ||
| However accessing a ctypes object can be much faster (20+ times | ||
| faster) than accessing a synchronized shared object allocated using | ||
| `LocalManager`. | ||
| The functions in the module are | ||
| `new_value(fmt_or_type, *args)` | ||
| Returns a ctypes object allocated from shared memory. | ||
| `fmt_or_type` determines the type of the returned object: it | ||
| is either a ctypes type or a one character string format of | ||
| the kind used by the `array` module. The remaining arguments | ||
| are passed on to the constructor for the type. | ||
| `new_array(fmt_or_type, size_or_initializer)` | ||
| Returns a ctypes array allocated from shared memory. | ||
| `fmt_or_type` determines the type of the elements of the | ||
| returned array: it is either a ctypes type or a one character | ||
| string format of the kind used by the `array` module. If | ||
| `size_or_initializer` is an integer then it determines the | ||
| length of the array, and the array will be initially zeroed. | ||
| Otherwise `size_or_initializer` is a sequence which is used to | ||
| initialize the array and whose length determines the length of | ||
| the array. | ||
| `copy(obj)` | ||
| Returns a ctypes object allocated from shared memory which is | ||
| a copy of the ctypes object `obj`. | ||
| Equivalences | ||
| ============ | ||
| The table below compares the syntax for creating a shared ctypes | ||
| object from shared memory with the normal ctypes syntax. (In the | ||
| table `MyStruct` is some subclass of `ctypes.Structure`.) | ||
| ============================ =========================== ==================== | ||
| sharedctypes using type sharedctypes using format ctypes | ||
| ============================ =========================== ==================== | ||
| new_value(c_double, 2.4) new_value('d', 2.4) c_double(2.4) | ||
| new_value(MyStruct, 4, 6) MyStruct(4, 6) | ||
| new_array(c_short, 7) new_array('h', 7) (c_short * 7)() | ||
| new_array(c_int, (9, 2, 8)) new_array('i', (9, 2, 8)) (c_int * 3)(9, 2, 8) | ||
| ============================ =========================== ==================== | ||
| Example | ||
| ======= | ||
| Below is an example where a number of ctypes objects are modified by a | ||
| child process :: | ||
| from processing import Process | ||
| from processing.sharedctypes import new_value, new_array | ||
| from ctypes import Structure, c_double | ||
| class Point(Structure): | ||
| _fields_ = [('x', c_double), ('y', c_double)] | ||
| def modify(n, x, s, A): | ||
| n.value **= 2 | ||
| x.value **= 2 | ||
| s.value = s.value.upper() | ||
| for p in A: | ||
| p.x **= 2 | ||
| p.y **= 2 | ||
| if __name__ == '__main__': | ||
| n = new_value('i', 7) | ||
| x = new_value('d', 1.0/3.0) | ||
| s = new_array('c', 'hello world') | ||
| A = new_array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)]) | ||
| p = Process(target=modify, args=(n, x, s, A)) | ||
| p.start() | ||
| p.join() | ||
| print n | ||
| print x | ||
| print s.value | ||
| print [(p.x, p.y) for p in A] | ||
| The results printed are :: | ||
| c_long(49) | ||
| c_double(0.1111111111111111) | ||
| HELLO WORLD | ||
| [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] | ||
| .. _Prev: pool-objects.html | ||
| .. _Up: processing-ref.html | ||
| .. _Next: connection-ref.html |
| .. |version| replace:: 0.39 |
+284
| # | ||
| # Module which supports allocation of memory from an mmap | ||
| # | ||
| # processing/heap.py | ||
| # | ||
| # Copyright (c) 2007, R Oudkerk --- see COPYING.txt | ||
| # | ||
| import bisect | ||
| import mmap | ||
| import tempfile | ||
| import os | ||
| import sys | ||
| import thread | ||
| from processing import Finalize, _processing, currentProcess | ||
| __all__ = ['BufferWrapper'] | ||
| # | ||
| # Class representing an mmap - can be inherited by child processes | ||
| # | ||
| class MMapWrapper(object): | ||
| __slots__ = ('__data', '__weakref__') | ||
| mmap = property(lambda self: self.__data[0]) | ||
| size = property(lambda self: self.__data[1]) | ||
| name = property(lambda self: self.__data[2]) | ||
| def __init__(self, size): | ||
| fd, name = tempfile.mkstemp(prefix='pym-') | ||
| remaining = size | ||
| while remaining > 0: | ||
| remaining -= os.write(fd, '\0' * remaining) | ||
| mmap_ = mmap.mmap(fd, size) | ||
| os.close(fd) | ||
| self.__data = (mmap_, size, name) | ||
| if sys.platform in ('win32', 'cygwin'): | ||
| Finalize( | ||
| self, MMapWrapper._finalize_heap, args=[mmap_, name], | ||
| atexit=True, priority=-10 | ||
| ) | ||
| else: | ||
| os.unlink(name) | ||
| def __getstate__(self): | ||
| assert sys.platform == 'win32' | ||
| return (self.name, self.size) | ||
| def __setstate__(self, state): | ||
| assert getattr(currentProcess(), '_unpickling', False), \ | ||
| 'mmaps should only be shared using process inheritance' | ||
| name, size = state | ||
| fd = os.open(name, os.O_RDWR | os.O_BINARY, int('0600', 8)) | ||
| mmap_ = mmap.mmap(fd, size) | ||
| os.close(fd) | ||
| self.__data = (mmap_, size, name) | ||
| @staticmethod | ||
| def _finalize_heap(mmap, name): | ||
| import os | ||
| mmap.close() | ||
| os.unlink(name) | ||
| # | ||
| # Class allowing allocation of (unmovable) chunks of memory from mmaps | ||
| # | ||
| class Heap(object): | ||
| _alignment = 4 | ||
| def __init__(self, size=1024): | ||
| self._lastpid = os.getpid() | ||
| self._lock = thread.allocate_lock() | ||
| self._size = size | ||
| self._lengths = [] | ||
| self._len_to_seq = {} | ||
| self._start_to_location = {} | ||
| self._stop_to_location = {} | ||
| self._allocated_locations = set() | ||
| self._mmaps = [] | ||
| def _roundup(self, n): | ||
| n = max(1, n) | ||
| q, r = divmod(n, self._alignment) | ||
| if r: | ||
| q += 1 | ||
| return q * self._alignment | ||
| def _malloc(self, size): | ||
| # returns a large enough block -- it might be much larger | ||
| i = bisect.bisect_left(self._lengths, size) | ||
| if i == len(self._lengths): | ||
| length = max(self._size, size) | ||
| self._size *= 2 | ||
| mmap = MMapWrapper(length) | ||
| self._mmaps.append(mmap) | ||
| return (mmap, 0, length) | ||
| else: | ||
| length = self._lengths[i] | ||
| seq = self._len_to_seq[length] | ||
| location = seq.pop() | ||
| if not seq: | ||
| del self._len_to_seq[length], self._lengths[i] | ||
| (mmap, start, stop) = location | ||
| del self._start_to_location[(mmap, start)] | ||
| del self._stop_to_location[(mmap, stop)] | ||
| return location | ||
| def _free(self, location): | ||
| # free location and try to merge with neighbours | ||
| (mmap, start, stop) = location | ||
| try: | ||
| prev_location = self._stop_to_location[(mmap, start)] | ||
| except KeyError: | ||
| pass | ||
| else: | ||
| start, _ = self._absorb(prev_location) | ||
| try: | ||
| next_location = self._start_to_location[(mmap, stop)] | ||
| except KeyError: | ||
| pass | ||
| else: | ||
| _, stop = self._absorb(next_location) | ||
| location = (mmap, start, stop) | ||
| length = stop - start | ||
| try: | ||
| self._len_to_seq[length].append(location) | ||
| except KeyError: | ||
| self._len_to_seq[length] = [location] | ||
| bisect.insort(self._lengths, length) | ||
| self._start_to_location[(mmap, start)] = location | ||
| self._stop_to_location[(mmap, stop)] = location | ||
| def _absorb(self, location): | ||
| # deregister this block so it can be merged with a neighbour | ||
| (mmap, start, stop) = location | ||
| del self._start_to_location[(mmap, start)] | ||
| del self._stop_to_location[(mmap, stop)] | ||
| length = stop - start | ||
| seq = self._len_to_seq[length] | ||
| seq.remove(location) | ||
| if not seq: | ||
| del self._len_to_seq[length] | ||
| self._lengths.remove(length) | ||
| return start, stop | ||
| def free(self, location): | ||
| # free a block returned by malloc() | ||
| self._lock.acquire() | ||
| try: | ||
| self._allocated_locations.remove(location) | ||
| self._free(location) | ||
| finally: | ||
| self._lock.release() | ||
| def malloc(self, size): | ||
| # return a block of right size (possibly rounded up) | ||
| self._lock.acquire() | ||
| try: | ||
| if os.getpid() != self._lastpid: | ||
| self.__init__() # reinitialize after fork | ||
| size = self._roundup(size) | ||
| (mmap, start, stop) = self._malloc(size) | ||
| new_stop = start + size | ||
| if new_stop < stop: | ||
| self._free((mmap, new_stop, stop)) | ||
| location = (mmap, start, new_stop) | ||
| self._allocated_locations.add(location) | ||
| return location | ||
| finally: | ||
| self._lock.release() | ||
| def _dump(self): | ||
| self._verify(dump=True) | ||
| def _verify(self, dump=False): | ||
| all = [] | ||
| occupied = 0 | ||
| for L in self._len_to_seq.values(): | ||
| for mmap, start, stop in L: | ||
| all.append((self._mmaps.index(mmap), start, stop, | ||
| stop-start, 'free')) | ||
| for mmap, start, stop in self._allocated_locations: | ||
| all.append((self._mmaps.index(mmap), start, stop, | ||
| stop-start, 'occupied')) | ||
| occupied += (stop-start) | ||
| all.sort() | ||
| if dump: | ||
| for line in all: | ||
| print '%8s%8s%8s%8s %s' % line | ||
| lengths = [len(w.mmap) for w in self._mmaps] | ||
| print 'mmap sizes =', lengths | ||
| print 'total size =', sum(lengths) | ||
| for i in range(len(all)-1): | ||
| (mmap, start, stop) = all[i][:3] | ||
| (nmmap, nstart, nstop) = all[i+1][:3] | ||
| assert ((mmap != nmmap and nstart == 0) or (stop == nstart)) | ||
| # | ||
| # Class representing a chunk of an mmap -- can be inherited | ||
| # | ||
| class BufferWrapper(object): | ||
| __slots__ = ('__data', '__weakref__') | ||
| _heap = Heap() | ||
| location = property(lambda self : self.__data[0]) | ||
| size = property(lambda self : self.__data[1]) | ||
| def __init__(self, size): | ||
| assert 0 <= size <= sys.maxint | ||
| location = BufferWrapper._heap.malloc(size) | ||
| self.__setstate__((location, size)) | ||
| Finalize(self, BufferWrapper._heap.free, args=[self.location]) | ||
| def getaddress(self): | ||
| w, start, stop = self.location | ||
| address, length = _processing.address_of_buffer(w.mmap) | ||
| assert self.size <= length | ||
| return address + start | ||
| def getview(self): | ||
| w, start, stop = self.location | ||
| return _processing.rwbuffer(w.mmap, start, self.size) | ||
| def __getstate__(self): | ||
| assert sys.platform == 'win32' | ||
| return self.__data | ||
| def __setstate__(self, state): | ||
| self.__data = state | ||
| # | ||
| # Test | ||
| # | ||
| def test(): | ||
| import random | ||
| iterations = 10000 | ||
| maxblocks = 50 | ||
| blocks = [] | ||
| maxsize = 0 | ||
| occ = 0 | ||
| maxocc = 0 | ||
| for i in xrange(iterations): | ||
| size = int(random.lognormvariate(0, 1) * 1000) | ||
| b = BufferWrapper(size) | ||
| occ += size | ||
| maxocc = max(maxocc, occ) | ||
| maxsize = max(maxsize, size) | ||
| blocks.append(b) | ||
| if len(blocks) > maxblocks: | ||
| i = random.randrange(maxblocks) | ||
| del blocks[i] | ||
| occ -= size | ||
| BufferWrapper._heap._dump() | ||
| print 'max size of a block =', maxsize | ||
| print 'max size occupied =', maxocc | ||
| print 'currently occupied =', occ | ||
| if __name__ == '__main__': | ||
| test() |
+131
| # | ||
| # Module which supports allocation of ctypes objects from shared memory | ||
| # | ||
| # processing/sharedctypes.py | ||
| # | ||
| # Copyright (c) 2007, R Oudkerk --- see COPYING.txt | ||
| # | ||
| import weakref | ||
| import ctypes | ||
| import sys | ||
| from processing import heap, Lock | ||
| __all__ = ['new_value', 'new_array', 'copy'] | ||
| # | ||
| # | ||
| # | ||
| def _new_value(type_): | ||
| size = ctypes.sizeof(type_) | ||
| wrapper = heap.BufferWrapper(size) | ||
| return rebuild_ctype(type_, wrapper, None) | ||
| def new_value(fmt_or_type, *args): | ||
| type_ = gettype(fmt_or_type) | ||
| obj = _new_value(type_) | ||
| ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj)) | ||
| obj.__init__(*args) | ||
| return obj | ||
| def new_array(fmt_or_type, size_or_initializer): | ||
| type_ = gettype(fmt_or_type) | ||
| if isinstance(type_, str): | ||
| type_ = _fmt_to_type[type_] | ||
| if isinstance(size_or_initializer, int): | ||
| type_ = type_ * size_or_initializer | ||
| return new_value(type_) | ||
| else: | ||
| type_ = type_ * len(size_or_initializer) | ||
| result = _new_value(type_) | ||
| result.__init__(*size_or_initializer) | ||
| return result | ||
| def copy(obj): | ||
| new_obj = _new_value(type(obj)) | ||
| ctypes.pointer(new_obj)[0] = obj | ||
| return new_obj | ||
| # | ||
| # Functions for pickling/unpickling | ||
| # | ||
| def reduce_ctype(obj): | ||
| assert sys.platform == 'win32' | ||
| if isinstance(obj, ctypes.Array): | ||
| return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_) | ||
| else: | ||
| return rebuild_ctype, (type(obj), obj._wrapper, None) | ||
| def rebuild_ctype(type_, wrapper, length): | ||
| if length is not None: | ||
| type_ = type_ * length | ||
| fixup(type_) | ||
| obj = type_.from_address(wrapper.getaddress()) | ||
| obj._wrapper = wrapper | ||
| return obj | ||
| def fixup(type_): | ||
| if (sys.platform == 'win32' and type_.__reduce__ is not reduce_ctype): | ||
| type_.__reduce__ = reduce_ctype | ||
| # | ||
| # Function which converts format strings to ctype types | ||
| # | ||
| def gettype(fmt_or_type, mapping={}): | ||
| if not mapping: | ||
| for name in dir(ctypes): | ||
| if name[:2] == 'c_': | ||
| T = getattr(ctypes, name) | ||
| if hasattr(T, '_type_'): | ||
| mapping[T._type_] = T | ||
| mapping.update(i=ctypes.c_int, I=ctypes.c_uint, | ||
| l=ctypes.c_long, L=ctypes.c_ulong) | ||
| return mapping.get(fmt_or_type, fmt_or_type) | ||
| # | ||
| # Tests | ||
| # | ||
| class _Foo(ctypes.Structure): | ||
| _fields_ = [ | ||
| ('x', ctypes.c_int), | ||
| ('y', ctypes.c_double) | ||
| ] | ||
| def _test(x, y, foo, arr, string): | ||
| x.value **= 2 | ||
| y.value **= 2 | ||
| foo.x **= 2 | ||
| foo.y **= 2 | ||
| for i in range(len(arr)): | ||
| arr[i] **= 2 | ||
| string.value = string.value.upper() | ||
| def test(): | ||
| from processing import Process | ||
| x = new_value('i', 7) | ||
| y = new_value(ctypes.c_double, 1.0/3.0) | ||
| foo = new_value(_Foo, 3, 2) | ||
| arr = new_array('d', range(10)) | ||
| string = new_array('c', 'hello world') | ||
| bar = copy(foo) | ||
| assert (foo.x, foo.y) == (bar.x, bar.y) | ||
| p = Process(target=_test, args=(x, y, foo, arr, string)) | ||
| p.start() | ||
| p.join() | ||
| print x.value | ||
| print y.value | ||
| print (foo.x, foo.y) | ||
| print arr[:] | ||
| print string.value | ||
| if __name__ == '__main__': | ||
| test() |
| #ifndef PROCESSING_DEFS_H | ||
| #define PROCESSING_DEFS_H | ||
| #define PY_SSIZE_T_CLEAN | ||
| #include "Python.h" | ||
| #if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) | ||
| typedef int Py_ssize_t; | ||
| # define PY_SSIZE_T_MAX INT_MAX | ||
| # define PY_SSIZE_T_MIN INT_MIN | ||
| # define N_FMT "i" | ||
| #else | ||
| # define N_FMT "n" | ||
| #endif | ||
| #endif /* PROCESSING_DEFS_H */ |
| #ifndef PROCESSING_H | ||
| #define PROCESSING_H | ||
| static PyObject* | ||
| processing_rwbuffer(PyObject *self, PyObject *args) | ||
| { | ||
| PyObject *obj; | ||
| Py_ssize_t offset = 0, size = Py_END_OF_BUFFER; | ||
| if (!PyArg_ParseTuple(args, "O|" N_FMT N_FMT, &obj, &offset, &size)) | ||
| return NULL; | ||
| return PyBuffer_FromReadWriteObject(obj, offset, size); | ||
| } | ||
| static PyObject* | ||
| processing_address_of_buffer(PyObject *self, PyObject *obj) | ||
| { | ||
| void *buffer; | ||
| Py_ssize_t buffer_len; | ||
| if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0) | ||
| return NULL; | ||
| return Py_BuildValue(N_FMT N_FMT, buffer, buffer_len); | ||
| } | ||
| #endif /* PROCESSING_H */ |
+4
-4
@@ -39,3 +39,3 @@ # | ||
| __version__ = '0.38' | ||
| __version__ = '0.39' | ||
@@ -103,8 +103,8 @@ __all__ = [ | ||
| def Pipe(): | ||
| def Pipe(duplex=True): | ||
| ''' | ||
| Returns two connection object connected by a duplex pipe | ||
| Returns two connection object connected by a pipe | ||
| ''' | ||
| from processing.connection import Pipe | ||
| return Pipe() | ||
| return Pipe(duplex) | ||
@@ -111,0 +111,0 @@ def cpuCount(): |
+28
-0
@@ -5,2 +5,30 @@ ======================================== | ||
| Changes in 0.39 | ||
| --------------- | ||
| * One can now create one-way pipes by doing | ||
| `reader, writer = Pipe(duplex=False)`. | ||
| * Rewrote code for managing shared memory maps. | ||
| * Added a `sharedctypes` module for creating `ctypes` objects allocated | ||
| from shared memory. On Python 2.4 this requires the installation of | ||
| `ctypes`. | ||
| `ctypes` objects are not protected by any locks so you will need to | ||
| synchronize access to them (such as by using a lock). However they | ||
| can be much faster to access than equivalent objects allocated using | ||
| a `LocalManager`. | ||
| * Rearranged documentation. | ||
| * Previously the C extension caused a segfault on 64 bit machines with | ||
| Python 2.5 because it used `int` instead of `Py_ssize_t` in certain | ||
| places. This is now fixed. Thanks to Alexy Khrabrov for the report. | ||
| * A fix for `Pool.terminate()`. | ||
| * A fix for cleanup behaviour of `Queue`. | ||
| Changes in 0.38 | ||
@@ -7,0 +35,0 @@ --------------- |
+49
-14
@@ -256,23 +256,54 @@ # | ||
| def Pipe(): | ||
| def Pipe(duplex=True): | ||
| ''' | ||
| Returns pair of connection objects at either end of a duplex connection | ||
| Returns pair of connection objects at either end of a pipe | ||
| ''' | ||
| if sys.platform == 'win32': | ||
| assert '_processing' in globals() | ||
| l = Listener() | ||
| a = Client(l.address) | ||
| b = l.accept() | ||
| l.close() | ||
| return a, b | ||
| address = arbitrary_address('AF_PIPE') | ||
| PIPE_ACCESS_INBOUND = 1 | ||
| PIPE_ACCESS_OUTBOUND = 2 | ||
| GENERIC_READ = 0x80000000 | ||
| GENERIC_WRITE = 0x40000000 | ||
| if duplex: | ||
| openmode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND | ||
| access = GENERIC_READ | GENERIC_WRITE | ||
| else: | ||
| openmode = PIPE_ACCESS_INBOUND | ||
| access = GENERIC_WRITE | ||
| a = _processing.createpipe(address, openmode=openmode) | ||
| _processing.waitpipe(address, 1000) | ||
| b = _processing.createfile(address, access) | ||
| _processing.connectpipe(a) | ||
| c = _processing.PipeConnection(a) | ||
| d = _processing.PipeConnection(b) | ||
| _processing.CloseHandle(a) | ||
| _processing.CloseHandle(b) | ||
| return c, d | ||
| else: | ||
| a, b = socket.socketpair() | ||
| if duplex: | ||
| a, b = socket.socketpair() | ||
| else: | ||
| a, b = os.pipe() | ||
| try: | ||
| Connection = _processing.SocketConnection | ||
| c, d = Connection(a.fileno()), Connection(b.fileno()) | ||
| a.close() | ||
| b.close() | ||
| except NameError: | ||
| Connection = _SocketConnection | ||
| c, d = Connection(a), Connection(b) | ||
| else: | ||
| if duplex: | ||
| c, d = Connection(a.fileno()), Connection(b.fileno()) | ||
| a.close() | ||
| b.close() | ||
| else: | ||
| c, d = Connection(a), Connection(b) | ||
| os.close(a) | ||
| os.close(b) | ||
| return c, d | ||
@@ -330,3 +361,3 @@ | ||
| family = address_type(address) | ||
| s = socket.socket( getattr(socket, family) ) | ||
| s = socket.socket(getattr(socket, family)) | ||
@@ -412,3 +443,5 @@ endtime = time.time() + 10 | ||
| return _processing.PipeConnection(h) | ||
| conn = _processing.PipeConnection(h) | ||
| _processing.CloseHandle(h) | ||
| return conn | ||
@@ -490,2 +523,4 @@ # | ||
| w = _processing.SocketConnection(w) | ||
| os.close(r) | ||
| os.close(w) | ||
@@ -492,0 +527,0 @@ print 'sending: ', obj |
+14
-96
@@ -7,3 +7,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" /> | ||
| <title>Connection module</title> | ||
| <title>Listeners and Clients</title> | ||
| <link rel="stylesheet" href="html4css1.css" type="text/css" /> | ||
@@ -13,13 +13,13 @@ </head> | ||
| <div class="header"> | ||
| <a class="reference" href="pool-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="programming-guidelines.html">Next</a> | ||
| <a class="reference" href="sharedctypes.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="programming-guidelines.html">Next</a> | ||
| <hr class="header"/> | ||
| </div> | ||
| <div class="document" id="connection-module"> | ||
| <h1 class="title">Connection module</h1> | ||
| <p>The <tt class="docutils literal"><span class="pre">connection</span></tt> module allows the sending of picklable objects | ||
| between processes using sockets or (on Windows) named pipes. It | ||
| also has support for <em>digest authentication</em> (using the <tt class="docutils literal"><span class="pre">hmac</span></tt> module | ||
| from the standard library).</p> | ||
| <p>If the C extension <tt class="docutils literal"><span class="pre">_processing</span></tt> is not available then connections | ||
| will be slower and on Windows one will not be able to use named pipes.</p> | ||
| <div class="document" id="listeners-and-clients"> | ||
| <h1 class="title">Listeners and Clients</h1> | ||
| <p>Usually message passing between processes is done using queues or by | ||
| using connection objects returned by <tt class="docutils literal"><span class="pre">Pipe()</span></tt>.</p> | ||
| <p>However, the <tt class="docutils literal"><span class="pre">processing.connection</span></tt> module allows some extra | ||
| flexibility. It basically gives a high level API for dealing with | ||
| sockets or Windows named pipes, and also has support for <em>digest | ||
| authentication</em> using the <tt class="docutils literal"><span class="pre">hmac</span></tt> module from the standard library.</p> | ||
| <div class="section"> | ||
@@ -35,7 +35,7 @@ <h1><a id="classes-and-functions" name="classes-and-functions">Classes and functions</a></h1> | ||
| <dd><p class="first">Attempts to set up a connection to the listener which is using | ||
| address <tt class="docutils literal"><span class="pre">address</span></tt>.</p> | ||
| address <tt class="docutils literal"><span class="pre">address</span></tt>, returning a <a class="reference" href="connection-object.html">connection object</a>.</p> | ||
| <p>The type of the connection is determined by <tt class="docutils literal"><span class="pre">family</span></tt> | ||
| argument, but this can generally be omitted since it can | ||
| usually be inferred from the format of <tt class="docutils literal"><span class="pre">address</span></tt>.</p> | ||
| <p>If <tt class="docutils literal"><span class="pre">authentication</span></tt> or <tt class="docutils literal"><span class="pre">authkey</span></tt> is a string then then digest | ||
| <p class="last">If <tt class="docutils literal"><span class="pre">authentication</span></tt> or <tt class="docutils literal"><span class="pre">authkey</span></tt> is a string then digest | ||
| authentication is used. The key used for authentication will | ||
@@ -45,26 +45,3 @@ be either <tt class="docutils literal"><span class="pre">authkey</span></tt> or <tt class="docutils literal"><span class="pre">currentProcess.getAuthKey()</span></tt> if | ||
| <tt class="docutils literal"><span class="pre">AuthenticationError</span></tt> is raised. See <a class="reference" href="#authentication-keys">Authentication keys</a>.</p> | ||
| <p class="last">A <tt class="docutils literal"><span class="pre">Connection</span></tt> object is returned. (See <a class="reference" href="#connection-objects">Connection | ||
| objects</a>.)</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">Pipe()</span></tt></dt> | ||
| <dd><p class="first">Returns a pair of two connection objects (see <a class="reference" href="#connection-objects">Connection | ||
| objects</a>) representing the ends of a duplex connection. | ||
| It is also available directly from <tt class="docutils literal"><span class="pre">processing</span></tt>.</p> | ||
| <p>For example:</p> | ||
| <pre class="literal-block"> | ||
| from processing import Process, Pipe | ||
| def foo(conn): | ||
| conn.send(42) | ||
| if __name__ == '__main__': | ||
| c, d = Pipe() | ||
| p = Process(target=foo, args=[d]) | ||
| p.start() | ||
| print c.recv() # prints 42 | ||
| p.join() | ||
| </pre> | ||
| <p class="last">Note that at most one thread/process should be sending or | ||
| receiving from a connection object at a given time.</p> | ||
| </dd> | ||
| </dl> | ||
@@ -152,3 +129,3 @@ </blockquote> | ||
| then <tt class="docutils literal"><span class="pre">AuthenticationError</span></tt> is raised.</p> | ||
| <p class="last">Returns a <tt class="docutils literal"><span class="pre">Connection</span></tt> object. See <a class="reference" href="#connection-objects">Connection objects</a>.</p> | ||
| <p class="last">Returns a <tt class="docutils literal"><span class="pre">connection</span> <span class="pre">object</span> <span class="pre"><connection-object.html></span></tt> object.</p> | ||
| </dd> | ||
@@ -175,61 +152,2 @@ <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <div class="section"> | ||
| <h1><a id="connection-objects" name="connection-objects">Connection objects</a></h1> | ||
| <p>A connection object represents one end of a message oriented socket or | ||
| pipe connection. Connection objects have the following methods:</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">send(obj)</span></tt></dt> | ||
| <dd><p class="first">Send an object to the other end of the connection which should | ||
| be read using <tt class="docutils literal"><span class="pre">recv()</span></tt>.</p> | ||
| <p class="last">The object must be picklable.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recv()</span></tt></dt> | ||
| <dd>Return an object sent from the other end of the connection | ||
| using <tt class="docutils literal"><span class="pre">send()</span></tt>.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">fileno()</span></tt></dt> | ||
| <dd>Returns the file descriptor or handle used by the connection.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd><p class="first">Close the connection.</p> | ||
| <p class="last">This is called automatically when the connection is garbage | ||
| collected.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">poll(timeout)</span></tt></dt> | ||
| <dd>Return whether there is any data available to be read.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">sendbytes(buffer)</span></tt></dt> | ||
| <dd><p class="first">Send byte data from an object supporting the buffer interface | ||
| as a complete message.</p> | ||
| <p class="last">Can be used to send strings.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recvbytes()</span></tt></dt> | ||
| <dd>Return a complete message of byte data sent from the other end | ||
| of the connection as a string.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">recvbytes_into(buffer)</span></tt></dt> | ||
| <dd><p class="first">Read into buffer a complete message of byte data sent from the | ||
| other end of the connection and return the number of bytes in | ||
| the message.</p> | ||
| <p class="last">If the buffer is too short then a <tt class="docutils literal"><span class="pre">BufferTooShort</span></tt> exception | ||
| is raised and the complete message of bytes data is available | ||
| as <tt class="docutils literal"><span class="pre">e.args[0]</span></tt> where <tt class="docutils literal"><span class="pre">e</span></tt> is the exception instance.</p> | ||
| </dd> | ||
| </dl> | ||
| </blockquote> | ||
| <div class="warning"> | ||
| <p class="first admonition-title">Warning</p> | ||
| <p class="last">The <tt class="docutils literal"><span class="pre">recv()</span></tt> method automatically unpickles the data it | ||
| receives which can be a security risk. Therefore if you are using | ||
| the <tt class="docutils literal"><span class="pre">recv()</span></tt> and <tt class="docutils literal"><span class="pre">send()</span></tt> methods you should be using some | ||
| form of authentication. See <a class="reference" href="#authentication-keys">Authentication keys</a>.</p> | ||
| </div> | ||
| </div> | ||
| <div class="section"> | ||
| <h1><a id="transferring-connection-objects-between-processes" name="transferring-connection-objects-between-processes">Transferring connection objects between processes</a></h1> | ||
| <p>If the C extension <tt class="docutils literal"><span class="pre">processing._process</span></tt> is available and contains | ||
| support then socket objects, connection objects and file objects can | ||
| be successfully pickled in one process and unpickled in another.</p> | ||
| <p>Note however that on Windows there is no <tt class="docutils literal"><span class="pre">socket.fromfd()</span></tt> function. | ||
| As a result on Windows an unpickled socket object is not a true socket | ||
| object: only the <tt class="docutils literal"><span class="pre">recv()</span></tt>, <tt class="docutils literal"><span class="pre">send()</span></tt>, <tt class="docutils literal"><span class="pre">sendall()</span></tt>, <tt class="docutils literal"><span class="pre">close()</span></tt> and | ||
| <tt class="docutils literal"><span class="pre">fileno()</span></tt> methods will work.</p> | ||
| </div> | ||
| <div class="section"> | ||
| <h1><a id="address-formats" name="address-formats">Address formats</a></h1> | ||
@@ -322,5 +240,5 @@ <ul> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="pool-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="programming-guidelines.html">Next</a> | ||
| <a class="reference" href="sharedctypes.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="programming-guidelines.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
+14
-107
| .. include:: header.txt | ||
| =================== | ||
| Connection module | ||
| =================== | ||
| ======================= | ||
| Listeners and Clients | ||
| ======================= | ||
| The `connection` module allows the sending of picklable objects | ||
| between processes using sockets or (on Windows) named pipes. It | ||
| also has support for *digest authentication* (using the `hmac` module | ||
| from the standard library). | ||
| Usually message passing between processes is done using queues or by | ||
| using connection objects returned by `Pipe()`. | ||
| If the C extension `_processing` is not available then connections | ||
| will be slower and on Windows one will not be able to use named pipes. | ||
| However, the `processing.connection` module allows some extra | ||
| flexibility. It basically gives a high level API for dealing with | ||
| sockets or Windows named pipes, and also has support for *digest | ||
| authentication* using the `hmac` module from the standard library. | ||
@@ -27,3 +27,4 @@ | ||
| Attempts to set up a connection to the listener which is using | ||
| address `address`. | ||
| address `address`, returning a `connection object | ||
| <connection-object.html>`_. | ||
@@ -34,3 +35,3 @@ The type of the connection is determined by `family` | ||
| If `authentication` or `authkey` is a string then then digest | ||
| If `authentication` or `authkey` is a string then digest | ||
| authentication is used. The key used for authentication will | ||
@@ -41,27 +42,2 @@ be either `authkey` or `currentProcess.getAuthKey()` if | ||
| A `Connection` object is returned. (See `Connection | ||
| objects`_.) | ||
| `Pipe()` | ||
| Returns a pair of two connection objects (see `Connection | ||
| objects`_) representing the ends of a duplex connection. | ||
| It is also available directly from `processing`. | ||
| For example:: | ||
| from processing import Process, Pipe | ||
| def foo(conn): | ||
| conn.send(42) | ||
| if __name__ == '__main__': | ||
| c, d = Pipe() | ||
| p = Process(target=foo, args=[d]) | ||
| p.start() | ||
| print c.recv() # prints 42 | ||
| p.join() | ||
| Note that at most one thread/process should be sending or | ||
| receiving from a connection object at a given time. | ||
| .. | ||
@@ -158,3 +134,3 @@ `deliver_challenge(connection, authkey)` | ||
| Returns a `Connection` object. See `Connection objects`_. | ||
| Returns a `connection object <connection-object.html>` object. | ||
@@ -179,71 +155,2 @@ `close()` | ||
| Connection objects | ||
| ================== | ||
| A connection object represents one end of a message oriented socket or | ||
| pipe connection. Connection objects have the following methods: | ||
| `send(obj)` | ||
| Send an object to the other end of the connection which should | ||
| be read using `recv()`. | ||
| The object must be picklable. | ||
| `recv()` | ||
| Return an object sent from the other end of the connection | ||
| using `send()`. | ||
| `fileno()` | ||
| Returns the file descriptor or handle used by the connection. | ||
| `close()` | ||
| Close the connection. | ||
| This is called automatically when the connection is garbage | ||
| collected. | ||
| `poll(timeout)` | ||
| Return whether there is any data available to be read. | ||
| `sendbytes(buffer)` | ||
| Send byte data from an object supporting the buffer interface | ||
| as a complete message. | ||
| Can be used to send strings. | ||
| `recvbytes()` | ||
| Return a complete message of byte data sent from the other end | ||
| of the connection as a string. | ||
| `recvbytes_into(buffer)` | ||
| Read into buffer a complete message of byte data sent from the | ||
| other end of the connection and return the number of bytes in | ||
| the message. | ||
| If the buffer is too short then a `BufferTooShort` exception | ||
| is raised and the complete message of bytes data is available | ||
| as `e.args[0]` where `e` is the exception instance. | ||
| .. warning:: | ||
| The `recv()` method automatically unpickles the data it | ||
| receives which can be a security risk. Therefore if you are using | ||
| the `recv()` and `send()` methods you should be using some | ||
| form of authentication. See `Authentication keys`_. | ||
| Transferring connection objects between processes | ||
| ================================================= | ||
| If the C extension `processing._process` is available and contains | ||
| support then socket objects, connection objects and file objects can | ||
| be successfully pickled in one process and unpickled in another. | ||
| Note however that on Windows there is no `socket.fromfd()` function. | ||
| As a result on Windows an unpickled socket object is not a true socket | ||
| object: only the `recv()`, `send()`, `sendall()`, `close()` and | ||
| `fileno()` methods will work. | ||
| Address formats | ||
@@ -340,5 +247,5 @@ =============== | ||
| .. _Prev: pool-objects.html | ||
| .. _Prev: sharedctypes.html | ||
| .. _Up: processing-ref.html | ||
| .. _Next: programming-guidelines.html | ||
+7
-4
@@ -7,3 +7,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <meta name="generator" content="Docutils 0.4: http://docutils.sourceforge.net/" /> | ||
| <title>Documentation for processing</title> | ||
| <title>Documentation for processing-0.39</title> | ||
| <meta name="author" content="R Oudkerk" /> | ||
@@ -17,4 +17,4 @@ <link rel="stylesheet" href="html4css1.css" type="text/css" /> | ||
| </div> | ||
| <div class="document" id="documentation-for-processing"> | ||
| <h1 class="title">Documentation for processing</h1> | ||
| <div class="document" id="documentation-for-processing-version"> | ||
| <h1 class="title">Documentation for processing-0.39</h1> | ||
| <table class="docinfo" frame="void" rules="none"> | ||
@@ -43,6 +43,9 @@ <col class="docinfo-name" /> | ||
| <li><a class="reference" href="process-objects.html">Process objects</a></li> | ||
| <li><a class="reference" href="queue-objects.html">Queue objects</a></li> | ||
| <li><a class="reference" href="connection-objects.html">Connection objects</a></li> | ||
| <li><a class="reference" href="manager-objects.html">Manager objects</a></li> | ||
| <li><a class="reference" href="proxy-objects.html">Proxy objects</a></li> | ||
| <li><a class="reference" href="pool-objects.html">Pool objects</a></li> | ||
| <li><a class="reference" href="connection-ref.html">connection module</a></li> | ||
| <li><a class="reference" href="sharedctypes.html">Shared ctypes object</a></li> | ||
| <li><a class="reference" href="connection-ref.html">Listeners and Clients</a></li> | ||
| </ul> | ||
@@ -49,0 +52,0 @@ </dd> |
+8
-4
| .. include:: header.txt | ||
| .. include:: version.txt | ||
| ============================== | ||
| Documentation for processing | ||
| ============================== | ||
| ======================================== | ||
| Documentation for processing-|version| | ||
| ======================================== | ||
@@ -19,6 +20,9 @@ :Author: R Oudkerk | ||
| + `Process objects <process-objects.html>`_ | ||
| + `Queue objects <queue-objects.html>`_ | ||
| + `Connection objects <connection-objects.html>`_ | ||
| + `Manager objects <manager-objects.html>`_ | ||
| + `Proxy objects <proxy-objects.html>`_ | ||
| + `Pool objects <pool-objects.html>`_ | ||
| + `connection module <connection-ref.html>`_ | ||
| + `Shared ctypes object <sharedctypes.html>`_ | ||
| + `Listeners and Clients <connection-ref.html>`_ | ||
@@ -25,0 +29,0 @@ * `Programming guidelines <programming-guidelines.html>`_ |
+21
-15
@@ -114,12 +114,8 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| </pre> | ||
| <p>Queues are thread and process safe.</p> | ||
| <p class="last">Note that if you need to guarantee that the <tt class="docutils literal"><span class="pre">put()</span></tt> method | ||
| will succeed without blocking -- which is sometimes very | ||
| useful when ensuring that a deadlock cannot occur -- then you | ||
| can use <tt class="docutils literal"><span class="pre">BufferedQueue()</span></tt> instead of <tt class="docutils literal"><span class="pre">Queue()</span></tt>. | ||
| See <a class="reference" href="processing-ref.html#pipes-and-queues">Queues</a>.</p> | ||
| <p class="last">Queues are thread and process safe. See <a class="reference" href="processing-ref.html#pipes-and-queues">Queues</a>.</p> | ||
| </dd> | ||
| <dt><strong>Pipes</strong>:</dt> | ||
| <dd><p class="first">The <tt class="docutils literal"><span class="pre">Pipe()</span></tt> function returns a pair of connection objects | ||
| connected by a duplex (two-way) pipe, for example</p> | ||
| connected by a pipe which by default is duplex (two-way). For | ||
| example</p> | ||
| <pre class="literal-block"> | ||
@@ -213,3 +209,3 @@ from processing import Process, Pipe | ||
| </pre> | ||
| <p class="last">Note that <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt> and <tt class="docutils literal"><span class="pre">SharedArray</span></tt> objects | ||
| <p>Note that <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt> and <tt class="docutils literal"><span class="pre">SharedArray</span></tt> objects | ||
| are thread and process safe. The <tt class="docutils literal"><span class="pre">'i'</span></tt> and <tt class="docutils literal"><span class="pre">'d256p'</span></tt> arguments | ||
@@ -219,3 +215,8 @@ used when creating <tt class="docutils literal"><span class="pre">num</span></tt>, <tt class="docutils literal"><span class="pre">struct</span></tt> and <tt class="docutils literal"><span class="pre">array</span></tt> are format strings | ||
| indicates a signed integer, <tt class="docutils literal"><span class="pre">'d'</span></tt> indicates a double precision | ||
| float and <tt class="docutils literal"><span class="pre">'256p'</span></tt> indicates a string of length less than 256.</p> | ||
| float and <tt class="docutils literal"><span class="pre">'256p'</span></tt> indicates a string of length less than 256. See | ||
| <a class="reference" href="manager-objects.html#shared-memory-managers">Shared memory managers</a>.</p> | ||
| <p class="last">A faster alternative to <tt class="docutils literal"><span class="pre">LocalManager</span></tt> is the <tt class="docutils literal"><span class="pre">processing.sharedctypes</span></tt> | ||
| module which supports the creation of <a class="reference" href="sharedctypes.html">ctypes objects allocated from | ||
| shared memory</a>. However, ctypes objects are not | ||
| "process safe", so one must synchronize access to them.</p> | ||
| </dd> | ||
@@ -265,3 +266,4 @@ <dt><strong>Server process</strong>:</dt> | ||
| can be shared by different computers over a network. They are, | ||
| however, slower than using shared memory.</p> | ||
| however, slower than using shared memory. See <a class="reference" href="manager-objects.html#server-process-managers">Server process | ||
| managers</a>.</p> | ||
| </dd> | ||
@@ -429,9 +431,13 @@ </dl> | ||
| <tr><td>list</td> | ||
| <td>6,000,000</td> | ||
| <td>4,800,000</td> | ||
| <td>6,400,000</td> | ||
| <td>5,100,000</td> | ||
| </tr> | ||
| <tr><td>shared memory array</td> | ||
| <td>120,000</td> | ||
| <td>110,000</td> | ||
| <tr><td>ctypes shared array</td> | ||
| <td>3,600,000</td> | ||
| <td>3,100,000</td> | ||
| </tr> | ||
| <tr><td>LocalManager shared array</td> | ||
| <td>125,000</td> | ||
| <td>135,000</td> | ||
| </tr> | ||
| <tr><td>list managed by server</td> | ||
@@ -438,0 +444,0 @@ <td>20,000</td> |
+16
-12
@@ -119,13 +119,9 @@ .. include:: header.txt | ||
| Queues are thread and process safe. | ||
| Queues are thread and process safe. See `Queues | ||
| <processing-ref.html#pipes-and-queues>`_. | ||
| Note that if you need to guarantee that the `put()` method | ||
| will succeed without blocking -- which is sometimes very | ||
| useful when ensuring that a deadlock cannot occur -- then you | ||
| can use `BufferedQueue()` instead of `Queue()`. | ||
| See `Queues <processing-ref.html#pipes-and-queues>`_. | ||
| **Pipes**: | ||
| The `Pipe()` function returns a pair of connection objects | ||
| connected by a duplex (two-way) pipe, for example :: | ||
| connected by a pipe which by default is duplex (two-way). For | ||
| example :: | ||
@@ -228,4 +224,10 @@ from processing import Process, Pipe | ||
| indicates a signed integer, `'d'` indicates a double precision | ||
| float and `'256p'` indicates a string of length less than 256. | ||
| float and `'256p'` indicates a string of length less than 256. See | ||
| `Shared memory managers <manager-objects.html#shared-memory-managers>`_. | ||
| A faster alternative to `LocalManager` is the `processing.sharedctypes` | ||
| module which supports the creation of `ctypes objects allocated from | ||
| shared memory <sharedctypes.html>`_. However, ctypes objects are not | ||
| "process safe", so one must synchronize access to them. | ||
| **Server process**: | ||
@@ -277,3 +279,4 @@ A manager object returned by `processing.Manager()` | ||
| can be shared by different computers over a network. They are, | ||
| however, slower than using shared memory. | ||
| however, slower than using shared memory. See `Server process | ||
| managers <manager-objects.html#server-process-managers>`_. | ||
@@ -364,4 +367,5 @@ | ||
| ============================== ========== ========== | ||
| list 6,000,000 4,800,000 | ||
| shared memory array 120,000 110,000 | ||
| list 6,400,000 5,100,000 | ||
| ctypes shared array 3,600,000 3,100,000 | ||
| LocalManager shared array 125,000 135,000 | ||
| list managed by server 20,000 17,000 | ||
@@ -368,0 +372,0 @@ ============================== ========== ========== |
@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <div class="header"> | ||
| <a class="reference" href="process-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="proxy-objects.html">Next</a> | ||
| <a class="reference" href="connection-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="proxy-objects.html">Next</a> | ||
| <hr class="header"/> | ||
@@ -384,5 +384,5 @@ </div> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="process-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="proxy-objects.html">Next</a> | ||
| <a class="reference" href="connection-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="proxy-objects.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
@@ -408,5 +408,5 @@ .. include:: header.txt | ||
| .. _Prev: process-objects.html | ||
| .. _Prev: connection-objects.html | ||
| .. _Up: processing-ref.html | ||
| .. _Next: proxy-objects.html | ||
@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <div class="header"> | ||
| <a class="reference" href="proxy-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-ref.html">Next</a> | ||
| <a class="reference" href="proxy-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="sharedctypes.html">Next</a> | ||
| <hr class="header"/> | ||
@@ -73,4 +73,4 @@ </div> | ||
| <p class="last">Also if <tt class="docutils literal"><span class="pre">chunksize</span></tt> is <tt class="docutils literal"><span class="pre">1</span></tt> then the <tt class="docutils literal"><span class="pre">next()</span></tt> method of the | ||
| iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has a <tt class="docutils literal"><span class="pre">timeout</span></tt> | ||
| parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise | ||
| iterator returned by the <tt class="docutils literal"><span class="pre">imap()</span></tt> method has an optional | ||
| <tt class="docutils literal"><span class="pre">timeout</span></tt> parameter: <tt class="docutils literal"><span class="pre">next(timeout)</span></tt> will raise | ||
| <tt class="docutils literal"><span class="pre">processing.TimeoutError</span></tt> if the result cannot be returned | ||
@@ -152,5 +152,5 @@ within <tt class="docutils literal"><span class="pre">timeout</span></tt> seconds.</p> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="proxy-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="connection-ref.html">Next</a> | ||
| <a class="reference" href="proxy-objects.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="sharedctypes.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
@@ -152,2 +152,2 @@ .. include:: header.txt | ||
| .. _Up: processing-ref.html | ||
| .. _Next: connection-ref.html | ||
| .. _Next: sharedctypes.html |
@@ -12,3 +12,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <div class="header"> | ||
| <a class="reference" href="processing-ref.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="manager-objects.html">Next</a> | ||
| <a class="reference" href="processing-ref.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="queue-objects.html">Next</a> | ||
| <hr class="header"/> | ||
@@ -193,5 +193,5 @@ </div> | ||
| <hr class="footer" /> | ||
| <a class="reference" href="processing-ref.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="manager-objects.html">Next</a> | ||
| <a class="reference" href="processing-ref.html">Prev</a> <a class="reference" href="processing-ref.html">Up</a> <a class="reference" href="queue-objects.html">Next</a> | ||
| </div> | ||
| </body> | ||
| </html> |
@@ -207,2 +207,2 @@ .. include:: header.txt | ||
| .. _Up: processing-ref.html | ||
| .. _Next: manager-objects.html | ||
| .. _Next: queue-objects.html |
+29
-144
@@ -32,3 +32,3 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <dd><p class="first">Exception raised by the <tt class="docutils literal"><span class="pre">recvbytes_into()</span></tt> method of a | ||
| <a class="reference" href="connection-ref.html#connection-objects">connection object</a> | ||
| <a class="reference" href="connection-objects.html">connection object</a> | ||
| when the supplied buffer object is too small for the message | ||
@@ -55,112 +55,26 @@ read.</p> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">Pipe()</span></tt></dt> | ||
| <dd><p class="first">Returns a pair of connection objects representing the ends of | ||
| a duplex pipe.</p> | ||
| <p>These connection objects can be inherited by child processes | ||
| and have methods <tt class="docutils literal"><span class="pre">send()</span></tt> and <tt class="docutils literal"><span class="pre">recv()</span></tt> (among others) for | ||
| sending and receiving picklable objects. (See <a class="reference" href="connection-ref.html#connection-objects">Connection | ||
| objects</a>.) For | ||
| example:</p> | ||
| <pre class="literal-block"> | ||
| >>> from processing import Pipe | ||
| >>> a, b = Pipe() | ||
| >>> a.send([1, 'hello', None]) | ||
| >>> b.recv() | ||
| [1, 'hello', None] | ||
| >>> b.sendbytes('thank you') | ||
| >>> a.recvbytes() | ||
| 'thank you' | ||
| </pre> | ||
| <p>Note that it is not safe to have more than one process (or | ||
| thread) reading or writing to the same end of a pipe at the | ||
| same time.</p> | ||
| <p>Also note that if a process is killed while it is trying to | ||
| read or write to a pipe then the data in the pipe is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie).</p> | ||
| <p class="last">On Windows this requires the <tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p> | ||
| <dt><tt class="docutils literal"><span class="pre">Pipe(duplex=True)</span></tt></dt> | ||
| <dd><p class="first">Returns a pair <tt class="docutils literal"><span class="pre">(conn1,</span> <span class="pre">conn2)</span></tt> of connection objects | ||
| representing the ends of a pipe.</p> | ||
| <p>If <tt class="docutils literal"><span class="pre">duplex</span></tt> is true then the pipe is two way; otherwise | ||
| <tt class="docutils literal"><span class="pre">conn1</span></tt> can only be used for receiving messages and <tt class="docutils literal"><span class="pre">conn2</span></tt> | ||
| can only be used for sending messages.</p> | ||
| <p class="last">See <a class="reference" href="connection-objects.html">Connection objects</a>.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt></dt> | ||
| <dd><p class="first">Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe.</p> | ||
| <p>It is a near clone of <tt class="docutils literal"><span class="pre">Queue.Queue</span></tt> except that the <tt class="docutils literal"><span class="pre">qsize()</span></tt> | ||
| method is not implemented and that the <tt class="docutils literal"><span class="pre">task_done()</span></tt> and | ||
| <tt class="docutils literal"><span class="pre">join()</span></tt> methods introduced in Python 2.5 are also missing.</p> | ||
| <p><tt class="docutils literal"><span class="pre">Queue</span></tt> has a few additional methods:</p> | ||
| <blockquote> | ||
| <dl class="docutils"> | ||
| <dt><tt class="docutils literal"><span class="pre">putmany(iterable)</span></tt></dt> | ||
| <dd>If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| <tt class="docutils literal"><span class="pre">q.putmany(X)</span></tt> is a faster alternative to <tt class="docutils literal"><span class="pre">for</span> <span class="pre">x</span> <span class="pre">in</span> <span class="pre">X:</span> | ||
| <span class="pre">q.put(x)</span></tt>. Raises an error if the queue has finite | ||
| size.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">close()</span></tt></dt> | ||
| <dd>Indicates that no more data will be put on this queue | ||
| by the current process. The background thread will | ||
| quit once it has flushed all it data. This is called | ||
| automatically when the queue is garbage collected.</dd> | ||
| <dt><tt class="docutils literal"><span class="pre">jointhread()</span></tt></dt> | ||
| <dd><p class="first">This joins the background thread and can only be used | ||
| after <tt class="docutils literal"><span class="pre">close()</span></tt> has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe.</p> | ||
| <p class="last">By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| <tt class="docutils literal"><span class="pre">canceljoin()</span></tt> to prevent this behaviour.</p> | ||
| <dt><tt class="docutils literal"><span class="pre">Queue(maxsize=0)</span></tt>, <tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt>, <tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0)</span></tt></dt> | ||
| <dd><p class="first">These functions return a process shared queue implemented in | ||
| different ways. The usual <tt class="docutils literal"><span class="pre">Empty</span></tt> and <tt class="docutils literal"><span class="pre">Full</span></tt> exceptions from | ||
| the <tt class="docutils literal"><span class="pre">Queue</span></tt> module are raised to signal timeouts.</p> | ||
| <p class="last">See <a class="reference" href="queue-objects.html">Queue objects</a>.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">canceljoin()</span></tt></dt> | ||
| <dd>Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue.</dd> | ||
| </dl> | ||
| </blockquote> | ||
| <p class="last">Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie).</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">SimpleQueue()</span></tt></dt> | ||
| <dd><p class="first">A simplified and faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt>. It is | ||
| really just a pipe protected by a couple of locks.</p> | ||
| <p>It has <tt class="docutils literal"><span class="pre">get()</span></tt> and <tt class="docutils literal"><span class="pre">put()</span></tt> methods but these do not have | ||
| <tt class="docutils literal"><span class="pre">block</span></tt> or <tt class="docutils literal"><span class="pre">timeout</span></tt> arguments. It also has an <tt class="docutils literal"><span class="pre">empty()</span></tt> | ||
| method.</p> | ||
| <p class="last"><strong>Warning</strong>: Unlike with <tt class="docutils literal"><span class="pre">Queue</span></tt> (when <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero) | ||
| using <tt class="docutils literal"><span class="pre">put()</span></tt> may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible.</p> | ||
| </dd> | ||
| <dt><tt class="docutils literal"><span class="pre">PosixQueue(maxsize=0,</span> <span class="pre">msgsize=0)</span></tt></dt> | ||
| <dd><p class="first">A faster alternative to <tt class="docutils literal"><span class="pre">Queue()</span></tt> which is available on Unix | ||
| systems which support Posix message queues.</p> | ||
| <p>However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| <tt class="docutils literal"><span class="pre">man</span> <span class="pre">7</span> <span class="pre">mq_overview</span></tt>.</p> | ||
| <p><tt class="docutils literal"><span class="pre">maxsize</span></tt> if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If <tt class="docutils literal"><span class="pre">maxsize</span></tt> is zero then this maximum value | ||
| is used.</p> | ||
| <p><tt class="docutils literal"><span class="pre">msgsize</span></tt> if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If <tt class="docutils literal"><span class="pre">msgsize</span></tt> is zero then this maximum value is used.</p> | ||
| <p class="last">If one tries to send a message which is too long then | ||
| <tt class="docutils literal"><span class="pre">ValueError</span></tt> will be raised.</p> | ||
| </dd> | ||
| </dl> | ||
| </blockquote> | ||
| </div> | ||
| <div class="section"> | ||
| <h1><a id="synchronization-primitives" name="synchronization-primitives">Synchronization primitives</a></h1> | ||
| <p>Generally synchronization primitives are not a necessary in a | ||
| multiprocess program as they are in a mulithreaded program.</p> | ||
| <p>Generally synchronization primitives are not as necessary in a | ||
| multiprocess program as they are in a mulithreaded program. See the | ||
| documentation for the standard library's <tt class="docutils literal"><span class="pre">threading</span></tt> module.</p> | ||
| <p>Note that one can also create synchronization primitves by using a | ||
| manager object -- see <a class="reference" href="#managers">Managers</a>.</p> | ||
| <p>The following all require support for native sempahores from the | ||
| <tt class="docutils literal"><span class="pre">_processing</span></tt> extension.</p> | ||
| <blockquote> | ||
@@ -204,10 +118,4 @@ <dl class="docutils"> | ||
| <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray</span></tt></blockquote> | ||
| <p>for creating objects stored in shared memory map. Also has | ||
| static methods</p> | ||
| <blockquote> | ||
| <tt class="docutils literal"><span class="pre">Lock</span></tt>, <tt class="docutils literal"><span class="pre">RLock</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Condition</span></tt>, <tt class="docutils literal"><span class="pre">Event</span></tt>, <tt class="docutils literal"><span class="pre">Queue</span></tt></blockquote> | ||
| <p>which are just aliases for other functions in the <tt class="docutils literal"><span class="pre">processing</span></tt> | ||
| namespace. See <a class="reference" href="manager-objects.html#shared-memory-managers">LocalManager</a>.</p> | ||
| <p class="last">Requires support for native semaphores from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p> | ||
| <p>for creating objects stored in shared memory map.</p> | ||
| <p class="last">See <a class="reference" href="manager-objects.html#shared-memory-managers">LocalManager</a>.</p> | ||
| </dd> | ||
@@ -222,19 +130,7 @@ <dt><tt class="docutils literal"><span class="pre">Manager()</span></tt></dt> | ||
| <blockquote> | ||
| <tt class="docutils literal"><span class="pre">list()</span></tt>, <tt class="docutils literal"><span class="pre">dict()</span></tt>, <tt class="docutils literal"><span class="pre">Namespace()</span></tt>, <tt class="docutils literal"><span class="pre">SharedValue()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">SharedStruct()</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray()</span></tt>, <tt class="docutils literal"><span class="pre">Lock()</span></tt>, <tt class="docutils literal"><span class="pre">RLock()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Semaphore()</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore()</span></tt>, <tt class="docutils literal"><span class="pre">Condition()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Event()</span></tt>, <tt class="docutils literal"><span class="pre">Queue()</span></tt>.</blockquote> | ||
| <p>For example:</p> | ||
| <pre class="literal-block"> | ||
| >>> from processing import Manager | ||
| >>> manager = Manager() | ||
| >>> l = manager.list(range(10)) | ||
| >>> l.reverse() | ||
| >>> print l | ||
| [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] | ||
| >>> print repr(l) | ||
| <Proxy[list] object at 0x00E1B3B0> | ||
| </pre> | ||
| <p class="last">See <a class="reference" href="manager-objects.html#syncmanager">SyncManager</a> and | ||
| <a class="reference" href="proxy-objects.html">Proxy objects</a>.</p> | ||
| <tt class="docutils literal"><span class="pre">list()</span></tt>, <tt class="docutils literal"><span class="pre">dict()</span></tt>, <tt class="docutils literal"><span class="pre">Namespace()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">SharedValue()</span></tt>, <tt class="docutils literal"><span class="pre">SharedStruct()</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Lock()</span></tt>, <tt class="docutils literal"><span class="pre">RLock()</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore()</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore()</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Condition()</span></tt>, <tt class="docutils literal"><span class="pre">Event()</span></tt>, <tt class="docutils literal"><span class="pre">Queue()</span></tt>.</blockquote> | ||
| <p class="last">See <a class="reference" href="manager-objects.html#sync-manager">SyncManager</a>.</p> | ||
| </dd> | ||
@@ -256,18 +152,4 @@ </dl> | ||
| <p>If <tt class="docutils literal"><span class="pre">processes</span></tt> is <tt class="docutils literal"><span class="pre">None</span></tt> then the number returned by | ||
| <tt class="docutils literal"><span class="pre">cpuCount()</span></tt> is used. See <a class="reference" href="pool-objects.html">Pool objects</a>.</p> | ||
| <p>Example:</p> | ||
| <pre class="literal-block"> | ||
| from processing import Pool | ||
| def f(x): | ||
| return x*x | ||
| if __name__ == '__main__': | ||
| pool = Pool(processes=2) | ||
| result1 = pool.apply_async(f, (10,)) | ||
| result2 = pool.map_async(f, range(5)) | ||
| print result1.get() # => "100" | ||
| print result2.get(timeout=1) # => "[0, 1, 4, 9, 16]" | ||
| </pre> | ||
| <p class="last">Requires support for native semaphores from <tt class="docutils literal"><span class="pre">_processing</span></tt>.</p> | ||
| <tt class="docutils literal"><span class="pre">cpuCount()</span></tt> is used.</p> | ||
| <p class="last">See <a class="reference" href="pool-objects.html">Pool objects</a>.</p> | ||
| </dd> | ||
@@ -383,6 +265,9 @@ </dl> | ||
| <li><a class="reference" href="process-objects.html">Process objects</a></li> | ||
| <li><a class="reference" href="queue-objects.html">Queue objects</a></li> | ||
| <li><a class="reference" href="connection-objects.html">Connection objects</a></li> | ||
| <li><a class="reference" href="manager-objects.html">Manager objects</a></li> | ||
| <li><a class="reference" href="proxy-objects.html">Proxy objects</a></li> | ||
| <li><a class="reference" href="pool-objects.html">Pool objects</a></li> | ||
| <li><a class="reference" href="connection-ref.html">connection module</a></li> | ||
| <li><a class="reference" href="sharedctypes.html">Shared ctypes object</a></li> | ||
| <li><a class="reference" href="connection-ref.html">Listeners and Clients</a></li> | ||
| </ul> | ||
@@ -389,0 +274,0 @@ </div> |
+34
-162
@@ -25,3 +25,3 @@ .. include:: header.txt | ||
| Exception raised by the `recvbytes_into()` method of a | ||
| `connection object <connection-ref.html#connection-objects>`_ | ||
| `connection object <connection-objects.html>`_ | ||
| when the supplied buffer object is too small for the message | ||
@@ -50,119 +50,26 @@ read. | ||
| `Pipe()` | ||
| Returns a pair of connection objects representing the ends of | ||
| a duplex pipe. | ||
| `Pipe(duplex=True)` | ||
| Returns a pair `(conn1, conn2)` of connection objects | ||
| representing the ends of a pipe. | ||
| These connection objects can be inherited by child processes | ||
| and have methods `send()` and `recv()` (among others) for | ||
| sending and receiving picklable objects. (See `Connection | ||
| objects <connection-ref.html#connection-objects>`_.) For | ||
| example:: | ||
| If `duplex` is true then the pipe is two way; otherwise | ||
| `conn1` can only be used for receiving messages and `conn2` | ||
| can only be used for sending messages. | ||
| See `Connection objects <connection-objects.html>`_. | ||
| >>> from processing import Pipe | ||
| >>> a, b = Pipe() | ||
| >>> a.send([1, 'hello', None]) | ||
| >>> b.recv() | ||
| [1, 'hello', None] | ||
| >>> b.sendbytes('thank you') | ||
| >>> a.recvbytes() | ||
| 'thank you' | ||
| `Queue(maxsize=0)`, `SimpleQueue()`, `PosixQueue(maxsize=0)` | ||
| These functions return a process shared queue implemented in | ||
| different ways. The usual `Empty` and `Full` exceptions from | ||
| the `Queue` module are raised to signal timeouts. | ||
| Note that it is not safe to have more than one process (or | ||
| thread) reading or writing to the same end of a pipe at the | ||
| same time. | ||
| See `Queue objects <queue-objects.html>`_. | ||
| Also note that if a process is killed while it is trying to | ||
| read or write to a pipe then the data in the pipe is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie). | ||
| On Windows this requires the `_processing` extension. | ||
| `Queue(maxsize=0)` | ||
| Returns a process shared queue implemented using a pipe and a | ||
| few locks/semaphores. A background thread transfers objects | ||
| from a buffer into the pipe. | ||
| It is a near clone of `Queue.Queue` except that the `qsize()` | ||
| method is not implemented and that the `task_done()` and | ||
| `join()` methods introduced in Python 2.5 are also missing. | ||
| `Queue` has a few additional methods: | ||
| `putmany(iterable)` | ||
| If the queue has infinite size then this adds all | ||
| items in the iterable to the queue's buffer. So | ||
| `q.putmany(X)` is a faster alternative to `for x in X: | ||
| q.put(x)`. Raises an error if the queue has finite | ||
| size. | ||
| `close()` | ||
| Indicates that no more data will be put on this queue | ||
| by the current process. The background thread will | ||
| quit once it has flushed all it data. This is called | ||
| automatically when the queue is garbage collected. | ||
| `jointhread()` | ||
| This joins the background thread and can only be used | ||
| after `close()` has been called. This blocks until | ||
| the background thread exits, ensuring that all data in | ||
| the buffer has been flushed to the pipe. | ||
| By default if a process is not the creator of the | ||
| queue then on exit it will attempt to join the queue's | ||
| background thread. The process can call | ||
| `canceljoin()` to prevent this behaviour. | ||
| `canceljoin()` | ||
| Prevents the background thread from being joined | ||
| automatically when the process exits. Unnecessary if | ||
| the current process created the queue. | ||
| Note that if a process is killed while it is trying to receive | ||
| or send to a queue then the data in the queue is likely to | ||
| become corrupted (because it may become impossible to be sure | ||
| where the message boundaries lie). | ||
| `SimpleQueue()` | ||
| A simplified and faster alternative to `Queue()`. It is | ||
| really just a pipe protected by a couple of locks. | ||
| It has `get()` and `put()` methods but these do not have | ||
| `block` or `timeout` arguments. It also has an `empty()` | ||
| method. | ||
| **Warning**: Unlike with `Queue` (when `maxsize` is zero) | ||
| using `put()` may block if the pipe's buffer does not have | ||
| sufficient space, so one must take care that no deadlocks are | ||
| possible. | ||
| `PosixQueue(maxsize=0, msgsize=0)` | ||
| A faster alternative to `Queue()` which is available on Unix | ||
| systems which support Posix message queues. | ||
| However, posix queues have a maximum number of messages that | ||
| can occupy the queue at a given time, and each message (when | ||
| expressed as a pickled string) has a maximum length --- see | ||
| `man 7 mq_overview`. | ||
| `maxsize` if specified determines the maximum number of items | ||
| that be in the queue. Note that unlike Pythons's normal queue | ||
| type if this is greater than a system defined maximum then an | ||
| error is raised. If `maxsize` is zero then this maximum value | ||
| is used. | ||
| `msgsize` if specified determines the maximum length that each | ||
| message can be (when expressed as a pickled string). If this | ||
| is greater than a system defined maximum then an error is | ||
| raised. If `msgsize` is zero then this maximum value is used. | ||
| If one tries to send a message which is too long then | ||
| `ValueError` will be raised. | ||
| Synchronization primitives | ||
| -------------------------- | ||
| Generally synchronization primitives are not a necessary in a | ||
| multiprocess program as they are in a mulithreaded program. | ||
| Generally synchronization primitives are not as necessary in a | ||
| multiprocess program as they are in a mulithreaded program. See the | ||
| documentation for the standard library's `threading` module. | ||
@@ -172,5 +79,2 @@ Note that one can also create synchronization primitves by using a | ||
| The following all require support for native sempahores from the | ||
| `_processing` extension. | ||
| `BoundedSemaphore(value=1)` | ||
@@ -217,13 +121,6 @@ Returns a bounded semaphore object: a clone of | ||
| for creating objects stored in shared memory map. Also has | ||
| static methods | ||
| `Lock`, `RLock`, `Semaphore`, `BoundedSemaphore`, | ||
| `Condition`, `Event`, `Queue` | ||
| for creating objects stored in shared memory map. | ||
| which are just aliases for other functions in the `processing` | ||
| namespace. See `LocalManager | ||
| <manager-objects.html#shared-memory-managers>`_. | ||
| See `LocalManager <manager-objects.html#shared-memory-managers>`_. | ||
| Requires support for native semaphores from `_processing`. | ||
@@ -239,22 +136,10 @@ `Manager()` | ||
| `list()`, `dict()`, `Namespace()`, `SharedValue()`, | ||
| `SharedStruct()`, `SharedArray()`, `Lock()`, `RLock()`, | ||
| `Semaphore()`, `BoundedSemaphore()`, `Condition()`, | ||
| `Event()`, `Queue()`. | ||
| `list()`, `dict()`, `Namespace()`, | ||
| `SharedValue()`, `SharedStruct()`, `SharedArray()`, | ||
| `Lock()`, `RLock()`, `Semaphore()`, `BoundedSemaphore()`, | ||
| `Condition()`, `Event()`, `Queue()`. | ||
| For example:: | ||
| See `SyncManager <manager-objects.html#sync-manager>`_. | ||
| >>> from processing import Manager | ||
| >>> manager = Manager() | ||
| >>> l = manager.list(range(10)) | ||
| >>> l.reverse() | ||
| >>> print l | ||
| [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] | ||
| >>> print repr(l) | ||
| <Proxy[list] object at 0x00E1B3B0> | ||
| See `SyncManager <manager-objects.html#syncmanager>`_ and | ||
| `Proxy objects`_. | ||
| Process Pools | ||
@@ -274,20 +159,5 @@ ------------- | ||
| If `processes` is `None` then the number returned by | ||
| `cpuCount()` is used. See `Pool objects | ||
| <pool-objects.html>`_. | ||
| `cpuCount()` is used. | ||
| Example:: | ||
| from processing import Pool | ||
| def f(x): | ||
| return x*x | ||
| if __name__ == '__main__': | ||
| pool = Pool(processes=2) | ||
| result1 = pool.apply_async(f, (10,)) | ||
| result2 = pool.map_async(f, range(5)) | ||
| print result1.get() # => "100" | ||
| print result2.get(timeout=1) # => "[0, 1, 4, 9, 16]" | ||
| Requires support for native semaphores from `_processing`. | ||
| See `Pool objects <pool-objects.html>`_. | ||
@@ -403,11 +273,13 @@ | ||
| * `Process objects <process-objects.html>`_ | ||
| * `Manager objects <manager-objects.html>`_ | ||
| * `Proxy objects <proxy-objects.html>`_ | ||
| * `Pool objects <pool-objects.html>`_ | ||
| * `connection module <connection-ref.html>`_ | ||
| + `Process objects <process-objects.html>`_ | ||
| + `Queue objects <queue-objects.html>`_ | ||
| + `Connection objects <connection-objects.html>`_ | ||
| + `Manager objects <manager-objects.html>`_ | ||
| + `Proxy objects <proxy-objects.html>`_ | ||
| + `Pool objects <pool-objects.html>`_ | ||
| + `Shared ctypes object <sharedctypes.html>`_ | ||
| + `Listeners and Clients <connection-ref.html>`_ | ||
| .. _Prev: intro.html | ||
| .. _Up: index.html | ||
| .. _Next: process-objects.html |
@@ -45,7 +45,2 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <tt class="docutils literal"><span class="pre">ProcessExit</span></tt> is a subclass of <tt class="docutils literal"><span class="pre">SystemExit</span></tt>.</dd> | ||
| <dt><em>Not letting the manager die early</em></dt> | ||
| <dd>When the main process terminates any manager process it has | ||
| created will also terminate. Make sure that no child process | ||
| still needs the manager when this happens --- the easiest way is | ||
| just to join all child processes you create.</dd> | ||
| <dt><em>Joining zombie processes</em></dt> | ||
@@ -56,15 +51,46 @@ <dd>On Unix when a process finishes but has not been joined it becomes | ||
| processes which have not yet been joined will be joined. Also | ||
| calling a finished process's <tt class="docutils literal"><span class="pre">isAlive()</span></tt> will join the process | ||
| will already have been joined. Even so it is probably good | ||
| practice to explicitly join all the processes that you start.</dd> | ||
| calling a finished process's <tt class="docutils literal"><span class="pre">isAlive()</span></tt> will join the process. | ||
| Even so it is probably good practice to explicitly join all the | ||
| processes that you start.</dd> | ||
| <dt><em>Better to inherit than pickle/unpickle</em></dt> | ||
| <dd><p class="first">On Windows many of types from the <tt class="docutils literal"><span class="pre">processing</span></tt> package need to be | ||
| picklable so that child processes can use them. However, on Unix | ||
| the following types are not picklable:</p> | ||
| <blockquote> | ||
| <tt class="docutils literal"><span class="pre">Lock</span></tt>, <tt class="docutils literal"><span class="pre">RLock</span></tt>, <tt class="docutils literal"><span class="pre">Semaphore</span></tt>, <tt class="docutils literal"><span class="pre">BoundedSemaphore</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">Condition</span></tt>, <tt class="docutils literal"><span class="pre">Event</span></tt>, <tt class="docutils literal"><span class="pre">Queue</span></tt>, <tt class="docutils literal"><span class="pre">SharedValue</span></tt>, | ||
| <tt class="docutils literal"><span class="pre">SharedStruct</span></tt>, <tt class="docutils literal"><span class="pre">SharedArray</span></tt>.</blockquote> | ||
| <p class="last">For the sake of compatibility it is better not to rely on these | ||
| types being picklable.</p> | ||
| <dd>On Windows many of types from the <tt class="docutils literal"><span class="pre">processing</span></tt> package need to be | ||
| picklable so that child processes can use them. However, one | ||
| should generally avoid sending shared objects to other processes | ||
| using pipes or queues. Instead you should arrange the program so | ||
| that a process which need access to a shared resource created | ||
| elsewhere can inherit it from an ancestor process.</dd> | ||
| <dt><em>Explicity pass resources to child processes</em></dt> | ||
| <dd><p class="first">On Unix a child process can make use of a shared resource created | ||
| in a parent process using a global resource. However, it is | ||
| better to pass the object as an argument to the constructor for | ||
| the child process.</p> | ||
| <p>Apart from making the code (potentially) compatible with Windows | ||
| this also ensures that as long as the child process is still alive | ||
| the object will not be garbage collected in the parent process. | ||
| This might be important if some resource is freed when the object | ||
| is garbage collected in the parent process.</p> | ||
| <p>So for instance</p> | ||
| <pre class="literal-block"> | ||
| from processing import Process, Lock | ||
| def f(): | ||
| ... do something using "lock" ... | ||
| if __name__ == '__main__': | ||
| lock = Lock() | ||
| for i in range(10): | ||
| Process(target=f).start() | ||
| </pre> | ||
| <p>should be rewritten as</p> | ||
| <pre class="last literal-block"> | ||
| from processing import Process, Lock | ||
| def f(l): | ||
| ... do something using "l" ... | ||
| if __name__ == '__main__': | ||
| lock = Lock() | ||
| for i in range(10): | ||
| Process(target=f, args=[lock]).start() | ||
| </pre> | ||
| </dd> | ||
@@ -71,0 +97,0 @@ </dl> |
@@ -42,8 +42,2 @@ .. include:: header.txt | ||
| *Not letting the manager die early* | ||
| When the main process terminates any manager process it has | ||
| created will also terminate. Make sure that no child process | ||
| still needs the manager when this happens --- the easiest way is | ||
| just to join all child processes you create. | ||
| *Joining zombie processes* | ||
@@ -54,19 +48,51 @@ On Unix when a process finishes but has not been joined it becomes | ||
| processes which have not yet been joined will be joined. Also | ||
| calling a finished process's `isAlive()` will join the process | ||
| will already have been joined. Even so it is probably good | ||
| practice to explicitly join all the processes that you start. | ||
| calling a finished process's `isAlive()` will join the process. | ||
| Even so it is probably good practice to explicitly join all the | ||
| processes that you start. | ||
| *Better to inherit than pickle/unpickle* | ||
| On Windows many of types from the `processing` package need to be | ||
| picklable so that child processes can use them. However, on Unix | ||
| the following types are not picklable: | ||
| picklable so that child processes can use them. However, one | ||
| should generally avoid sending shared objects to other processes | ||
| using pipes or queues. Instead you should arrange the program so | ||
| that a process which need access to a shared resource created | ||
| elsewhere can inherit it from an ancestor process. | ||
| `Lock`, `RLock`, `Semaphore`, `BoundedSemaphore`, | ||
| `Condition`, `Event`, `Queue`, `SharedValue`, | ||
| `SharedStruct`, `SharedArray`. | ||
| *Explicity pass resources to child processes* | ||
| On Unix a child process can make use of a shared resource created | ||
| in a parent process using a global resource. However, it is | ||
| better to pass the object as an argument to the constructor for | ||
| the child process. | ||
| For the sake of compatibility it is better not to rely on these | ||
| types being picklable. | ||
| Apart from making the code (potentially) compatible with Windows | ||
| this also ensures that as long as the child process is still alive | ||
| the object will not be garbage collected in the parent process. | ||
| This might be important if some resource is freed when the object | ||
| is garbage collected in the parent process. | ||
| So for instance :: | ||
| from processing import Process, Lock | ||
| def f(): | ||
| ... do something using "lock" ... | ||
| if __name__ == '__main__': | ||
| lock = Lock() | ||
| for i in range(10): | ||
| Process(target=f).start() | ||
| should be rewritten as :: | ||
| from processing import Process, Lock | ||
| def f(l): | ||
| ... do something using "l" ... | ||
| if __name__ == '__main__': | ||
| lock = Lock() | ||
| for i in range(10): | ||
| Process(target=f, args=[lock]).start() | ||
| Windows | ||
@@ -73,0 +99,0 @@ ------- |
+0
-3
@@ -62,5 +62,2 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| <dd>Some simple benchmarks comparing <tt class="docutils literal"><span class="pre">processing</span></tt> with <tt class="docutils literal"><span class="pre">threading</span></tt>.</dd> | ||
| <dt><a class="reference" href="../test/test_with.py">test_with.py</a></dt> | ||
| <dd>The same as <tt class="docutils literal"><span class="pre">test_processing</span></tt> but uses Python 2.5's <tt class="docutils literal"><span class="pre">with</span></tt> | ||
| statement.</dd> | ||
| </dl> | ||
@@ -67,0 +64,0 @@ </blockquote> |
+0
-4
@@ -62,6 +62,2 @@ .. include:: header.txt | ||
| `test_with.py <../test/test_with.py>`_ | ||
| The same as `test_processing` but uses Python 2.5's `with` | ||
| statement. | ||
| All the modules in the `test` folder use `freezeSupport()` so frozen | ||
@@ -68,0 +64,0 @@ executables can be produced from them by using `py2exe`, `PyInstaller` |
+24
-4
@@ -24,2 +24,4 @@ <?xml version="1.0" encoding="utf-8" ?> | ||
| </tr> | ||
| <tr><th class="docinfo-name">Version:</th> | ||
| <td>0.39</td></tr> | ||
| <tr class="field"><th class="docinfo-name">Licence:</th><td class="field-body">BSD Licence</td> | ||
@@ -32,6 +34,13 @@ </tr> | ||
| <tt class="docutils literal"><span class="pre">threading</span></tt> module. It runs on both Unix and Windows.</p> | ||
| <p>Objects can be transferred between processes using pipes or queues, | ||
| and objects can be shared between processes using a server process or | ||
| (for simple data) shared memory. Equivalents of the synchronization | ||
| primitives in <tt class="docutils literal"><span class="pre">threading</span></tt> are also provided.</p> | ||
| <p><em>Features</em>:</p> | ||
| <ul class="simple"> | ||
| <li>Objects can be transferred between processes using pipes or | ||
| multi-producer/multi-consumer queues.</li> | ||
| <li>Objects can be shared between processes using a server process or | ||
| (for simple data) shared memory.</li> | ||
| <li>Equivalents of all the synchronization primitives in <tt class="docutils literal"><span class="pre">threading</span></tt> | ||
| are available.</li> | ||
| <li>A <tt class="docutils literal"><span class="pre">Pool</span></tt> class makes it easy to submit tasks to a pool of worker | ||
| processes.</li> | ||
| </ul> | ||
| <div class="section"> | ||
@@ -97,2 +106,13 @@ <h1><a id="links" name="links">Links</a></h1> | ||
| </pre> | ||
| <p>Tasks can be offloaded to a pool of worker processes in various ways, | ||
| for example</p> | ||
| <pre class="literal-block"> | ||
| >>> from processing import Pool | ||
| >>> def f(x): return x*x | ||
| ... | ||
| >>> p = Pool(4) | ||
| >>> result = p.map_async(f, range(10)) | ||
| >>> print result.get(timeout=1) | ||
| [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | ||
| </pre> | ||
| <a href="http://developer.berlios.de" title="BerliOS Developer"> | ||
@@ -99,0 +119,0 @@ <img src="http://developer.berlios.de/bslogo.php?group_id=9001" |
+97
-359
@@ -0,1 +1,10 @@ | ||
| # | ||
| # Module providing the `LocalManager` class for dealing | ||
| # with shared objects in shared memory | ||
| # | ||
| # processing/localmanager.py | ||
| # | ||
| # Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt | ||
| # | ||
| import mmap | ||
@@ -6,6 +15,6 @@ import re | ||
| import tempfile | ||
| import array | ||
| from processing import synchronize, process, queue, _processing | ||
| from processing import synchronize, process, queue, heap, _processing | ||
| from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize | ||
| from array import array | ||
@@ -15,218 +24,12 @@ __all__ = [ 'LocalManager' ] | ||
| # | ||
| # Heap implementation | ||
| # | ||
| # | ||
| class Heap(object): | ||
| ''' | ||
| Class for allocating and freeing memory from an mmap | ||
| ''' | ||
| def __init__(self, size, lock=None): | ||
| fd, self._name = tempfile.mkstemp(prefix='pym-') | ||
| self._size = remaining = size | ||
| while remaining > 0: | ||
| remaining -= os.write(fd, '\0' * remaining) | ||
| self._lock = lock or RLock() | ||
| self._mmap = mmap.mmap(fd, self._size) | ||
| os.close(fd) | ||
| self._set_pos(8) | ||
| try: | ||
| from struct import Struct | ||
| from functools import partial | ||
| _struct_cache = {} | ||
| except ImportError: | ||
| Struct = None | ||
| if sys.platform in ('win32', 'cygwin'): | ||
| process.Finalize(self, Heap._finalize_heap, | ||
| args=[self._mmap, self._name], atexit=True) | ||
| else: | ||
| os.unlink(self._name) | ||
| def __getstate__(self): | ||
| if sys.platform != 'win32': | ||
| raise NotImplemented | ||
| return self._size, self._name, self._lock | ||
| def __setstate__(self, state): | ||
| self._size, self._name, self._lock = state | ||
| fd = os.open(self._name, os.O_RDWR, 0600) | ||
| self._mmap = mmap.mmap(fd, self._size) | ||
| os.close(fd) | ||
| @staticmethod | ||
| def _finalize_heap(mmap, name): | ||
| import os | ||
| mmap.close() | ||
| try: | ||
| os.unlink(name) | ||
| except WindowsError: | ||
| process.info('failed to unlink %r -- try again later', name) | ||
| process._trylater.append((os.unlink, [name])) | ||
| def malloc(self, bytes): | ||
| ''' | ||
| Allocate chunk of memory of given size | ||
| ''' | ||
| bytes += 4 # extra room for header info | ||
| bytes = ((bytes + 3) // 4) * 4 # round up to multiple of 4 | ||
| self._lock.acquire() | ||
| try: | ||
| offset = self._malloc(bytes) | ||
| return offset + 4 | ||
| finally: | ||
| self._lock.release() | ||
| def free(self, offset): | ||
| ''' | ||
| Free a chunk of memory previously allocated by malloc | ||
| ''' | ||
| self._lock.acquire() | ||
| try: | ||
| self._free(offset - 4) | ||
| finally: | ||
| self._lock.release() | ||
| def _malloc(self, bytes): | ||
| ''' | ||
| Allocate chunk of memory -- `bytes` must be multiple of 4. | ||
| First 4 bytes are occupied by header info so bytes must be at least 4 | ||
| ''' | ||
| # resize mmap if the file has been enlarged by other process | ||
| if self._size < self._mmap.size(): | ||
| self._mmap.resize(self._mmap.size()) | ||
| self._size = len(self._mmap) | ||
| start = stop = None | ||
| # iterate over empty chunks | ||
| for start, stop in self._get_empty_chunks(): | ||
| # if chunk is large enough | ||
| if stop - start >= bytes: | ||
| # write header for the chunk we are creating | ||
| self._set_chunk_info(start, start + bytes, True) | ||
| # write header for following empty chunk if necessary | ||
| if start + bytes < stop: | ||
| self._set_chunk_info(start + bytes, stop, False) | ||
| # update position of last empty chunk | ||
| if start == self._get_pos(): | ||
| try: | ||
| pos, _ = self._get_empty_chunks().next() | ||
| except StopIteration: | ||
| pos = self._size | ||
| self._set_pos(pos) | ||
| return start | ||
| # get position of memory just after last occupied chunk | ||
| if stop == self._size: | ||
| pos = start | ||
| else: | ||
| pos = self._size | ||
| # work out how much space we need and resize mmap | ||
| while pos + bytes > self._size: | ||
| self._size *= 2 | ||
| if self._size > len(self._mmap): | ||
| self._mmap.resize(self._size) | ||
| # mark chunk as occupied and update position of last empty chunk | ||
| self._set_chunk_info(pos, pos + bytes, True) | ||
| if pos == self._get_pos(): | ||
| self._set_pos(pos + bytes) | ||
| return pos | ||
| def _free(self, start): | ||
| ''' | ||
| Free chunk of memory -- must have been allocated by _malloc | ||
| ''' | ||
| stop, occ = self._get_chunk_info(start) | ||
| assert occ | ||
| # mark chunk as not occupied | ||
| self._mmap[start:stop] = '\0' * (stop - start) | ||
| self._set_chunk_info(start, stop, False) | ||
| # if this chunk is before previous first empty chunk | ||
| if self._get_pos() > start: | ||
| # record start as position of first empty chunk | ||
| self._set_pos(start) | ||
| def _get_chunks(self, start=8): | ||
| ''' | ||
| Iterator over chunks of memory. | ||
| Returns (start, stop, occupied) tuples. | ||
| ''' | ||
| while start < self._size: | ||
| stop, occ = self._get_chunk_info(start) | ||
| yield start, stop, occ | ||
| start = stop | ||
| def _get_empty_chunks(self): | ||
| ''' | ||
| Iterator over empty chunks of memory. | ||
| When possible adjacent empty chunks are merged. | ||
| Returns (start, stop) pairs. | ||
| ''' | ||
| pos = self._get_pos() | ||
| it = self._get_chunks(pos) | ||
| for start, stop, occ in it: | ||
| if not occ: | ||
| merge = False | ||
| for nstart, nstop, occ in it: | ||
| if occ: | ||
| break | ||
| stop = nstart = nstop | ||
| merge = True | ||
| if merge: | ||
| self._mmap[start:stop] = '\0' * (stop - start) | ||
| self._set_chunk_info(start, stop, False) | ||
| yield start, stop | ||
| def _get_pos(self): | ||
| ''' | ||
| Return offset of first empty chunk of memory (or self._size if none) | ||
| ''' | ||
| return _unpack('i', self._mmap[0:4])[0] | ||
| def _set_pos(self, pos): | ||
| ''' | ||
| Set offset of first empty chunk of memory | ||
| ''' | ||
| self._mmap[0:4] = _pack('i', pos) | ||
| def _get_chunk_info(self, start): | ||
| ''' | ||
| Get info at beginning of a chunk of memory starting at start | ||
| Returns (stop, isoccupied) pair | ||
| ''' | ||
| info = _unpack('i', self._mmap[start:start+4])[0] | ||
| stop = abs(info) | ||
| isoccupied = info > 0 | ||
| if stop == 0: | ||
| assert not isoccupied | ||
| return self._size, False | ||
| return stop, isoccupied | ||
| def _set_chunk_info(self, start, stop, occ): | ||
| ''' | ||
| Set info at begining of a chunk of memory | ||
| ''' | ||
| assert stop > 0 | ||
| if not occ: | ||
| stop = -stop | ||
| self._mmap[start:start+4] = _pack('i', stop) | ||
| def _dump(self): | ||
| ''' | ||
| Print hex dump of mmap | ||
| ''' | ||
| assert len(self._mmap) % 4 == 0 | ||
| for i in range(len(self._mmap) // 4): | ||
| if i % 8 == 0: | ||
| print '%08x' % _unpack('I', self._mmap[4*i:4*i+4]), | ||
| # | ||
@@ -237,30 +40,35 @@ # Class for a struct which lives in shared memory | ||
| class SharedStruct(object): | ||
| def __init__(self, format, value, lock): | ||
| wrapper = heap.BufferWrapper(_calcsize(format)) | ||
| self.__setstate__((wrapper, format, lock)) | ||
| self.set(value) | ||
| def __init__(self, heap, format, value, _lock=None, _start=None): | ||
| self._heap = heap | ||
| self._format = format | ||
| size = _calcsize(format) | ||
| if _start is None: | ||
| self._start = heap.malloc(size) | ||
| assert format is not None | ||
| else: | ||
| self._start = _start | ||
| self._stop = self._start + size | ||
| self._lock = _lock or self._heap._lock | ||
| def __getstate__(self): | ||
| assert sys.platform == 'win32' | ||
| return self._wrapper, self._format, self._lock | ||
| def __setstate__(self, state): | ||
| self._wrapper, self._format, self._lock = state | ||
| self._buffer = self._wrapper.getview() | ||
| self._acquire = self._lock.acquire | ||
| self._release = self._lock.release | ||
| self._mmap = self._heap._mmap | ||
| if Struct: | ||
| try: | ||
| s = _struct_cache[self._format] | ||
| except KeyError: | ||
| s = Struct(self._format) | ||
| self._get = partial(s.unpack_from, self._buffer) | ||
| self._set = partial(s.pack_into, self._buffer, 0) | ||
| if _start is None: | ||
| self.set(value) | ||
| process.Finalize( | ||
| self, self._heap.free, args=[self._start], atexit=True | ||
| ) | ||
| def _get(self): | ||
| return _unpack(self._format, self._buffer[:]) | ||
| def _set(self, *args): | ||
| self._buffer[:] = _pack(self._format, *args) | ||
| def get(self): | ||
| self._acquire() | ||
| try: | ||
| return _unpack(self._format, self._mmap[self._start:self._stop]) | ||
| return self._get() | ||
| finally: | ||
@@ -272,3 +80,3 @@ self._release() | ||
| try: | ||
| self._mmap[self._start:self._stop] = _pack(self._format, *value) | ||
| self._set(*value) | ||
| finally: | ||
@@ -282,8 +90,2 @@ self._release() | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return type(self), (self._heap, self._format, None, | ||
| self._lock, self._start) | ||
| # | ||
@@ -295,10 +97,6 @@ # Class for a length 1 struct which lives in shared memory | ||
| def __init__(self, heap, format, value, _lock=None, _start=None): | ||
| assert len(_unpack(format, '\0' * _calcsize(format))) == 1 | ||
| SharedStruct.__init__(self, heap, format, value, _lock, _start) | ||
| def get(self): | ||
| self._acquire() | ||
| try: | ||
| return _unpack(self._format, self._mmap[self._start:self._stop])[0] | ||
| return self._get()[0] | ||
| finally: | ||
@@ -310,3 +108,3 @@ self._release() | ||
| try: | ||
| self._mmap[self._start:self._stop] = _pack(self._format, value) | ||
| self._set(value) | ||
| finally: | ||
@@ -322,43 +120,30 @@ self._release() | ||
| class SharedArray(object): | ||
| def __init__(self, format, sequence, lock): | ||
| wrapper = heap.BufferWrapper(_calcsize(format) * len(sequence)) | ||
| self.__setstate__((wrapper, format, lock)) | ||
| self[:] = sequence | ||
| def __init__(self, heap, format, sequence, | ||
| _lock=None, _start=None, _length=None): | ||
| self._heap = heap | ||
| self._format = format | ||
| self._itemsize = _calcsize(format) | ||
| def __getstate__(self): | ||
| assert sys.platform == 'win32' | ||
| return self._wrapper, self._format, self._lock | ||
| if _start is None: | ||
| sequence = tuple(sequence) | ||
| self._length = len(sequence) | ||
| self._start = heap.malloc(self._itemsize * self._length) | ||
| else: | ||
| self._start = _start | ||
| self._length = _length | ||
| self._stop = self._start + self._itemsize * self._length | ||
| self._lock = _lock or self._heap._lock | ||
| def __setstate__(self, state): | ||
| self._wrapper, self._format, self._lock = state | ||
| self._buffer = self._wrapper.getview() | ||
| self._acquire = self._lock.acquire | ||
| self._release = self._lock.release | ||
| self._mmap = self._heap._mmap | ||
| self._itemsize = _calcsize(self._format) | ||
| self._length, rem = divmod(len(self._buffer), self._itemsize) | ||
| assert rem == 0 | ||
| if _start is None: | ||
| self[:] = sequence | ||
| process.Finalize( | ||
| self, self._heap.free, args=[self._start], atexit=True | ||
| ) | ||
| def __len__(self): | ||
| return self._length | ||
| def __iter__(self): | ||
| return iter(array.array(self._format, | ||
| self._mmap[self._start:self._stop])) | ||
| def __getitem__(self, i): | ||
| self._acquire() | ||
| try: | ||
| assert 0 <= i < self._length | ||
| t = self._start + self._itemsize * i | ||
| return _unpack(self._format, self._mmap[t:t+self._itemsize])[0] | ||
| a = i * self._itemsize | ||
| b = a + self._itemsize | ||
| return _unpack(self._format, self._buffer[a:b])[0] | ||
| finally: | ||
@@ -370,5 +155,5 @@ self._release() | ||
| try: | ||
| assert 0 <= i < self._length | ||
| t = self._start + self._itemsize * i | ||
| self._mmap[t:t+self._itemsize] = _pack(self._format, value) | ||
| a = i * self._itemsize | ||
| b = a + self._itemsize | ||
| self._buffer[a:b] = _pack(self._format, value) | ||
| finally: | ||
@@ -380,6 +165,5 @@ self._release() | ||
| try: | ||
| at = self._start + self._itemsize * a | ||
| bt = self._start + self._itemsize * b | ||
| bt = min(bt, self._stop) | ||
| return array.array(self._format, self._mmap[at:bt]) | ||
| at = self._itemsize * a | ||
| bt = self._itemsize * b | ||
| return array(self._format, self._buffer[at:bt]) | ||
| finally: | ||
@@ -391,6 +175,5 @@ self._release() | ||
| try: | ||
| at = self._start + self._itemsize * a | ||
| bt = self._start + self._itemsize * b | ||
| bt = min(bt, self._stop) | ||
| self._mmap[at:bt] = array.array(self._format, seq).tostring() | ||
| at = self._itemsize * a | ||
| bt = self._itemsize * b | ||
| self._buffer[at:bt] = array(self._format, seq).tostring() | ||
| finally: | ||
@@ -402,3 +185,3 @@ self._release() | ||
| try: | ||
| return self._mmap[self._start:self._stop] | ||
| return self._buffer[:] | ||
| finally: | ||
@@ -410,7 +193,6 @@ self._release() | ||
| try: | ||
| s = self._mmap[self._start:self._stop] | ||
| arr = array.array(self._format, s) | ||
| return list(arr) | ||
| arr = array(self._format, self._buffer[:]) | ||
| finally: | ||
| self._release() | ||
| return list(arr) | ||
@@ -425,8 +207,2 @@ def __repr__(self): | ||
| if sys.platform == 'win32': | ||
| def __reduce__(self): | ||
| return type(self), (self._heap, self._format, None, | ||
| self._lock, self._start, self._length) | ||
| # | ||
@@ -438,5 +214,4 @@ # LocalManager | ||
| def __init__(self, size=256): | ||
| def __init__(self): | ||
| self._lock = synchronize.RLock() | ||
| self._size = size | ||
@@ -469,19 +244,10 @@ def start(self): | ||
| def _getheap(self): | ||
| self._lock.acquire() | ||
| try: | ||
| if not hasattr(self, '_heap'): | ||
| self._heap = Heap(size=self._size, lock=self._lock) | ||
| return self._heap | ||
| finally: | ||
| self._lock.release() | ||
| def SharedValue(self, format, value): | ||
| return SharedValue(self._getheap(), format, value) | ||
| return SharedValue(format, value, self._lock) | ||
| def SharedStruct(self, format, value): | ||
| return SharedStruct(self._getheap(), format, value) | ||
| return SharedStruct(format, value, self._lock) | ||
| def SharedArray(self, format, sequence): | ||
| return SharedArray(self._getheap(), format, sequence) | ||
| return SharedArray(format, sequence, self._lock) | ||
@@ -492,54 +258,26 @@ # | ||
| def test0(): | ||
| m = LocalManager() | ||
| a = [] | ||
| for i in range(10): | ||
| a.append(m.SharedValue('i', i)) | ||
| print list(m._heap._get_empty_chunks()) | ||
| del a[3:8] | ||
| print list(m._heap._get_empty_chunks()) | ||
| for i in range(100): | ||
| a.append(m.SharedValue('i', 0x100 + i)) | ||
| print list(m._heap._get_empty_chunks()) | ||
| m._heap._dump() | ||
| def _test(x, y, z): | ||
| x.value = 42 | ||
| y.value = (1729, 3.1415927) | ||
| for i in range(len(z)): | ||
| z[i] *= 2 | ||
| def test(): | ||
| ''' | ||
| Randomly create and delete shared arrays of chars | ||
| from processing import Process | ||
| Check no corruption caused | ||
| ''' | ||
| import random | ||
| m = LocalManager() | ||
| manager = LocalManager() | ||
| x = m.SharedValue('i', 0) | ||
| y = m.SharedStruct('id', (0, 0)) | ||
| z = m.SharedArray('d', range(10)) | ||
| L = [] | ||
| maxcount = 10 | ||
| p = Process(target=_test, args=(x, y, z)) | ||
| p.start() | ||
| p.join() | ||
| for k in range(1000): | ||
| i = random.randrange(100) | ||
| arr = ''.join([chr(j) for j in range(i)]) | ||
| sarr = manager.SharedArray('c', arr) | ||
| L.append((arr, sarr)) | ||
| if len(L) > maxcount: | ||
| j = random.randrange(maxcount) | ||
| del L[j] | ||
| print 'Heap size =', manager._heap._size | ||
| print 'Empty chunks =', list(manager._getheap()._get_empty_chunks()) | ||
| print x | ||
| print y | ||
| print z | ||
| for arr, sarr in L: | ||
| assert arr == ''.join(sarr) | ||
| print 'Tests passed' | ||
| if __name__ == '__main__': | ||
| test() |
+10
-7
@@ -18,4 +18,3 @@ # | ||
| import threading, os, sys, weakref, traceback, array, copy_reg, cPickle | ||
| import process | ||
| from processing import process | ||
| from processing.connection import Listener, Client, AuthenticationError | ||
@@ -384,6 +383,10 @@ from processing.connection import deliver_challenge, answer_challenge | ||
| info('manager received shutdown message') | ||
| Finalize._run_all_finalizers() | ||
| Finalize._run_all_finalizers(0) | ||
| for p in activeChildren(): | ||
| debug('terminating a child process of manager') | ||
| p.terminate() | ||
| for p in activeChildren(): | ||
| debug('terminating a child process of manager') | ||
| p.join() | ||
| Finalize._run_all_finalizers() | ||
| info('process exiting with `os.exit(0)`') | ||
@@ -477,6 +480,6 @@ os._exit(0) | ||
| self._process.setAuthKey(self._authkey) | ||
| try: | ||
| self._process.setDaemon(True) | ||
| except AssertionError: | ||
| pass | ||
| ## try: | ||
| ## self._process.setDaemon(True) | ||
| ## except AssertionError: | ||
| ## pass | ||
| self._process.start() | ||
@@ -483,0 +486,0 @@ |
+26
-7
| Metadata-Version: 1.0 | ||
| Name: processing | ||
| Version: 0.38 | ||
| Version: 0.39 | ||
| Summary: Package for using processes mimicking the `threading` module | ||
@@ -9,13 +9,21 @@ Home-page: http://developer.berlios.de/projects/pyprocessing | ||
| License: BSD Licence | ||
| Description: | ||
| ``processing`` is a package for the Python language which supports the | ||
| Description: ``processing`` is a package for the Python language which supports the | ||
| spawning of processes using the API of the standard library's | ||
| ``threading`` module. It runs on both Unix and Windows. | ||
| Objects can be transferred between processes using pipes or queues, | ||
| and objects can be shared between processes using a server process or | ||
| (for simple data) shared memory. Equivalents of the synchronization | ||
| primitives in ``threading`` are also provided. | ||
| *Features*: | ||
| * Objects can be transferred between processes using pipes or | ||
| multi-producer/multi-consumer queues. | ||
| * Objects can be shared between processes using a server process or | ||
| (for simple data) shared memory. | ||
| * Equivalents of all the synchronization primitives in ``threading`` | ||
| are available. | ||
| * A ``Pool`` class makes it easy to submit tasks to a pool of worker | ||
| processes. | ||
| Links | ||
@@ -82,4 +90,15 @@ ===== | ||
| Tasks can be offloaded to a pool of worker processes in various ways, | ||
| for example :: | ||
| >>> from processing import Pool | ||
| >>> def f(x): return x*x | ||
| ... | ||
| >>> p = Pool(4) | ||
| >>> result = p.map_async(f, range(10)) | ||
| >>> print result.get(timeout=1) | ||
| [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | ||
| Platform: Unix and Windows | ||
@@ -86,0 +105,0 @@ Classifier: Development Status :: 4 - Beta |
+46
-25
@@ -20,4 +20,5 @@ # | ||
| import collections | ||
| import time | ||
| from processing.process import debug | ||
| from processing.process import debug, _sleep_until_neq | ||
@@ -58,3 +59,3 @@ # | ||
| debug('worker got None -- exiting') | ||
| debug('worker got sentinel -- exiting') | ||
@@ -111,5 +112,6 @@ # | ||
| self, Pool._terminate_pool, | ||
| args=[self._taskqueue, self._outqueue, self._cache, self._pool, | ||
| self._task_handler, self._result_handler], | ||
| atexit=True | ||
| args=[self._taskqueue, self._inqueue, self._outqueue, | ||
| self._cache, self._pool, self._task_handler, | ||
| self._result_handler], | ||
| atexit=True, priority=5 | ||
| ) | ||
@@ -201,21 +203,25 @@ | ||
| for taskseq, setlength in iter(taskqueue.get, None): | ||
| if thread._state: | ||
| debug('task handler found thread._state != RUN -- exiting') | ||
| return | ||
| i = -1 | ||
| for i, task in enumerate(taskseq): | ||
| if thread._state: | ||
| debug('task handler found thread._state != RUN') | ||
| break | ||
| put(task) | ||
| if setlength: | ||
| setlength(i+1) | ||
| else: | ||
| if setlength: | ||
| debug('doing setlength()') | ||
| setlength(i+1) | ||
| continue | ||
| break | ||
| else: | ||
| debug('task handler got None') | ||
| debug('task handler got sentinel') | ||
| # tell result handler to finish when cache is empty | ||
| outqueue.put(None) | ||
| # tell workers there is no more work | ||
| debug('task handler sending None to workers') | ||
| debug('task handler sending sentinel to workers') | ||
| for p in pool: | ||
| put(None) | ||
| # tell result handler to finish when cache is empty | ||
| outqueue.put(None) | ||
| debug('task handler exiting') | ||
@@ -238,8 +244,8 @@ | ||
| else: | ||
| debug('result handler got None') | ||
| debug('result handler got sentinel') | ||
| while cache and thread._state == RUN: | ||
| while cache and thread._state != TERMINATE: | ||
| item = get() | ||
| if item is None: | ||
| debug('result handler ignoring None') | ||
| debug('result handler ignoring extra sentinel') | ||
| job, i, obj = item | ||
@@ -276,2 +282,4 @@ try: | ||
| shutdown = terminate # depracated alias | ||
| def join(self): | ||
@@ -285,6 +293,4 @@ debug('joining pool') | ||
| shutdown = terminate # depracated alias | ||
| @staticmethod | ||
| def _terminate_pool(taskqueue, outqueue, cache, pool, | ||
| def _terminate_pool(taskqueue, inqueue, outqueue, cache, pool, | ||
| task_handler, result_handler): | ||
@@ -300,12 +306,27 @@ debug('finalizing pool') | ||
| result_handler._state = TERMINATE | ||
| debug('sending sentinels') | ||
| taskqueue.put(None) | ||
| outqueue.put(None) | ||
| debug('getting read lock on inqueue') | ||
| inqueue._rlock.acquire() | ||
| debug('terminating pool workers') | ||
| debug('terminating workers') | ||
| for p in pool: | ||
| if p.isAlive(): | ||
| p.terminate() | ||
| p.terminate() | ||
| if task_handler.isAlive(): | ||
| debug('removing tasks from inqueue until task handler finished') | ||
| while task_handler.isAlive() and inqueue._reader.poll(): | ||
| inqueue._reader.recv() | ||
| time.sleep(0) | ||
| debug('joining result handler') | ||
| result_handler.join() | ||
| debug('joining task handler') | ||
| task_handler.join() | ||
| result_handler.join() | ||
| debug('joining pool workers') | ||
| for p in pool: | ||
| p.join() | ||
@@ -312,0 +333,0 @@ # |
+20
-21
@@ -56,3 +56,2 @@ # | ||
| def activeChildren(): | ||
@@ -111,4 +110,4 @@ ''' | ||
| self._target = target | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
| self._args = tuple(args) | ||
| self._kwargs = kwargs.copy() | ||
| self._stoppable = False | ||
@@ -668,3 +667,3 @@ self._parent_pid = os.getpid() | ||
| @staticmethod | ||
| def _run_all_finalizers(): | ||
| def _run_all_finalizers(minpriority=None): | ||
| ''' | ||
@@ -674,14 +673,17 @@ Run remaining callbacks (in reverse order of registration) | ||
| Finalize._exiting = True | ||
| L = Finalize._registry.keys() | ||
| L.sort(key=lambda f: f._orderkey, reverse=True) | ||
| if minpriority is None: | ||
| minkey = (-sys.maxint, 0) | ||
| else: | ||
| minkey = (minpriority, 0) | ||
| for finalizer in L: | ||
| if finalizer._atexit: | ||
| try: | ||
| finalizer() | ||
| except Exception: | ||
| import traceback | ||
| traceback.print_exc() | ||
| finalizers = [f for f in Finalize._registry.keys() | ||
| if f._atexit and f._orderkey >= minkey] | ||
| finalizers.sort(key=lambda f: f._orderkey, reverse=True) | ||
| _trylater = [] | ||
| for finalizer in finalizers: | ||
| try: | ||
| finalizer() | ||
| except Exception: | ||
| import traceback | ||
| traceback.print_exc() | ||
@@ -693,4 +695,4 @@ # | ||
| def _exit_func(): | ||
| info('running all "atexit" finalizers') | ||
| Finalize._run_all_finalizers() | ||
| info('running all "atexit" finalizers with priority >= 0') | ||
| Finalize._run_all_finalizers(0) | ||
@@ -718,8 +720,5 @@ for p in activeChildren(): | ||
| for (func, args) in _trylater: | ||
| try: | ||
| func(*args) | ||
| except: | ||
| info('failed to do "trylater" call: %s', (func, args)) | ||
| info('running all "atexit" finalizers with priority < 0') | ||
| Finalize._run_all_finalizers() | ||
| atexit.register(_exit_func) |
+55
-58
@@ -22,2 +22,3 @@ # | ||
| from processing.process import debug, Finalize | ||
| from processing.connection import Pipe | ||
| from Queue import Empty, Full | ||
@@ -39,13 +40,3 @@ | ||
| def __init__(self, maxsize=0): | ||
| if sys.platform == 'win32': | ||
| from processing.connection import Listener, Client | ||
| l = Listener() | ||
| reader = Client(l.address) | ||
| writer = l.accept() | ||
| else: | ||
| rfd, wfd = os.pipe() | ||
| reader = _processing.SocketConnection(rfd) | ||
| writer = _processing.SocketConnection(wfd) | ||
| os.close(rfd), os.close(wfd) | ||
| reader, writer = Pipe(duplex=False) | ||
| rlock = Lock() | ||
@@ -73,2 +64,3 @@ wlock = Lock() | ||
| self._thread = None | ||
| self._joincancelled = False | ||
@@ -162,5 +154,5 @@ def __getstate__(self): | ||
| def close(self): | ||
| self._closed = True | ||
| if self._close: | ||
| self._close() | ||
| self._closed = True | ||
@@ -171,5 +163,9 @@ def jointhread(self): | ||
| self._jointhread() | ||
| def canceljoin(self): | ||
| self._jointhread.cancel() | ||
| self._joincancelled = True | ||
| try: | ||
| self._jointhread.cancel() | ||
| except AttributeError: | ||
| pass | ||
@@ -185,2 +181,16 @@ def _startthread(self): | ||
| # On process exit we will wait for data to be flushed to pipe. | ||
| # | ||
| # However, if this process created the queue then all | ||
| # processes which use the queue will be descendants of this | ||
| # process. Therefore waiting for the queue to be flushed | ||
| # is pointless once all the child processes have been joined. | ||
| created_by_this_process = (self._opid == os.getpid()) | ||
| if not self._joincancelled and not created_by_this_process: | ||
| self._jointhread = Finalize( | ||
| self._thread, Queue._finalize_join, | ||
| [weakref.ref(self._thread)], | ||
| priority=-5, atexit=True | ||
| ) | ||
| # Send sentinel to the thread queue object is garbage collected | ||
@@ -193,13 +203,2 @@ self._close = Finalize( | ||
| # Flush data to pipe | ||
| self._jointhread = Finalize( | ||
| self._thread, Queue._finalize_join, | ||
| [weakref.ref(self._thread)], | ||
| atexit=True | ||
| ) | ||
| # Don't bother joining on exit if this process created the queue | ||
| if self._opid == os.getpid(): | ||
| self._jointhread.cancel() | ||
| @staticmethod | ||
@@ -228,23 +227,32 @@ def _finalize_join(twr): | ||
| debug('starting thread to feed data to pipe') | ||
| while 1: | ||
| notempty.acquire() | ||
| try: | ||
| if not buffer: | ||
| notempty.wait() | ||
| finally: | ||
| notempty.release() | ||
| try: | ||
| while 1: | ||
| obj = buffer.popleft() | ||
| if obj is _sentinel: | ||
| debug('feeder thread got sentinel -- exiting') | ||
| return | ||
| try: | ||
| while 1: | ||
| notempty.acquire() | ||
| try: | ||
| if not buffer: | ||
| notempty.wait() | ||
| finally: | ||
| notempty.release() | ||
| try: | ||
| while 1: | ||
| obj = buffer.popleft() | ||
| if obj is _sentinel: | ||
| debug('feeder thread got sentinel -- exiting') | ||
| return | ||
| writelock.acquire() | ||
| try: | ||
| send(obj) | ||
| finally: | ||
| writelock.release() | ||
| except IndexError: | ||
| pass | ||
| writelock.acquire() | ||
| try: | ||
| send(obj) | ||
| finally: | ||
| writelock.release() | ||
| except IndexError: | ||
| pass | ||
| except Exception, e: | ||
| # Since this runs in a daemon thread the objects it uses | ||
| # may be become unusable while the process is cleaning up. | ||
| # We ignore errors which happen after the process has | ||
| # started to cleanup. | ||
| debug('error in queue thread: %s', e) | ||
| if not Finalize._exiting: | ||
| raise | ||
@@ -260,15 +268,4 @@ _sentinel = object() | ||
| def __init__(self): | ||
| if sys.platform == 'win32': | ||
| from processing.connection import Listener, Client | ||
| l = Listener() | ||
| reader = Client(l.address) | ||
| writer = l.accept() | ||
| state = reader, writer, Lock(), None | ||
| else: | ||
| rfd, wfd = os.pipe() | ||
| reader = _processing.SocketConnection(rfd) | ||
| writer = _processing.SocketConnection(wfd) | ||
| os.close(rfd), os.close(wfd) | ||
| state = reader, writer, Lock(), Lock() | ||
| reader, writer = Pipe(duplex=False) | ||
| state = reader, writer, Lock(), Lock() | ||
| self.__setstate__(state) | ||
@@ -275,0 +272,0 @@ |
+27
-5
@@ -0,1 +1,3 @@ | ||
| .. include:: doc/version.txt | ||
| =================== | ||
@@ -8,5 +10,5 @@ Python processing | ||
| :Url: http://developer.berlios.de/projects/pyprocessing | ||
| :Version: |version| | ||
| :Licence: BSD Licence | ||
| ``processing`` is a package for the Python language which supports the | ||
@@ -16,8 +18,17 @@ spawning of processes using the API of the standard library's | ||
| Objects can be transferred between processes using pipes or queues, | ||
| and objects can be shared between processes using a server process or | ||
| (for simple data) shared memory. Equivalents of the synchronization | ||
| primitives in ``threading`` are also provided. | ||
| *Features*: | ||
| * Objects can be transferred between processes using pipes or | ||
| multi-producer/multi-consumer queues. | ||
| * Objects can be shared between processes using a server process or | ||
| (for simple data) shared memory. | ||
| * Equivalents of all the synchronization primitives in ``threading`` | ||
| are available. | ||
| * A ``Pool`` class makes it easy to submit tasks to a pool of worker | ||
| processes. | ||
| Links | ||
@@ -84,3 +95,14 @@ ===== | ||
| Tasks can be offloaded to a pool of worker processes in various ways, | ||
| for example :: | ||
| >>> from processing import Pool | ||
| >>> def f(x): return x*x | ||
| ... | ||
| >>> p = Pool(4) | ||
| >>> result = p.map_async(f, range(10)) | ||
| >>> print result.get(timeout=1) | ||
| [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | ||
| .. raw:: html | ||
@@ -87,0 +109,0 @@ |
+4
-3
@@ -14,4 +14,3 @@ # | ||
| import os, sys, copy_reg, socket | ||
| import _processing | ||
| import process | ||
| from processing import _processing, process | ||
@@ -83,3 +82,5 @@ # | ||
| handle = rebuild_handle(reduced_handle) | ||
| return _processing.PipeConnection(handle) | ||
| conn = _processing.PipeConnection(handle) | ||
| _processing.CloseHandle(handle) | ||
| return conn | ||
@@ -86,0 +87,0 @@ copy_reg.pickle(_processing.PipeConnection, reduce_pipe_connection) |
+1
-5
@@ -137,7 +137,3 @@ # | ||
| data = ['*.txt', '*.html', 'doc/*.html', 'doc/*.css'] + glob('test/*.py') | ||
| if sys.version_info < (2, 5, 0): | ||
| data = [x for x in data if 'test_with.py' not in x] | ||
| data = ['*.txt', '*.html', 'doc/*.html', 'doc/*.css', 'test/*.py'] | ||
| kwds['package_data'] = {'processing': data} | ||
@@ -144,0 +140,0 @@ |
+45
-27
@@ -1,2 +0,2 @@ | ||
| /* | ||
| /* | ||
| * Definition of a `Connection` type. | ||
@@ -37,7 +37,7 @@ * Used by `socket_connection.h` and `pipe_connection.h`. | ||
| char *buffer; | ||
| int length; | ||
| int res; | ||
| Py_ssize_t length; | ||
| CHECKHANDLE(self); | ||
| if (!PyArg_ParseTuple(args, "s#", &buffer, &length)) | ||
@@ -60,3 +60,3 @@ return NULL; | ||
| char *freeme = NULL; | ||
| int res; | ||
| Py_ssize_t nbytes; | ||
| PyObject *result = NULL; | ||
@@ -67,12 +67,12 @@ | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme); | ||
| nbytes = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme); | ||
| Py_END_ALLOW_THREADS | ||
| if (res < 0) { | ||
| SetExcFromNumber(res); | ||
| if (nbytes < 0) { | ||
| SetExcFromNumber(nbytes); | ||
| } else { | ||
| if (freeme == NULL) { | ||
| result = Py_BuildValue("s#", self->buffer, res); | ||
| result = Py_BuildValue("s#", self->buffer, nbytes); | ||
| } else { | ||
| result = Py_BuildValue("s#", freeme, res); | ||
| result = Py_BuildValue("s#", freeme, nbytes); | ||
| free(freeme); | ||
@@ -89,3 +89,4 @@ } | ||
| char *freeme = NULL, *buffer = NULL; | ||
| int res, length; | ||
| Py_ssize_t nbytes, length; | ||
| int offset=0; | ||
| PyObject *result = NULL; | ||
@@ -95,19 +96,30 @@ | ||
| if (!PyArg_ParseTuple(args, "w#", &buffer, &length)) | ||
| if (!PyArg_ParseTuple(args, "w#|i", &buffer, &length, &offset)) | ||
| return NULL; | ||
| if (offset < 0) { | ||
| PyErr_SetString(PyExc_ValueError, "negative offset"); | ||
| return NULL; | ||
| } | ||
| if (offset > length) { | ||
| PyErr_SetString(PyExc_ValueError, "offset too large"); | ||
| return NULL; | ||
| } | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = recv_string(self->handle, buffer, length, &freeme); | ||
| nbytes = recv_string(self->handle, buffer+offset, length-offset, &freeme); | ||
| Py_END_ALLOW_THREADS | ||
| if (res < 0) { | ||
| SetExcFromNumber(res); | ||
| if (nbytes < 0) { | ||
| SetExcFromNumber(nbytes); | ||
| } else { | ||
| if (freeme == NULL) { | ||
| result = Py_BuildValue("i", res); | ||
| result = Py_BuildValue("i", nbytes); | ||
| } else { | ||
| result = PyObject_CallFunction(BufferTooShort, "s#", freeme, res); | ||
| result = PyObject_CallFunction(BufferTooShort, | ||
| "s#", freeme, nbytes); | ||
| free(freeme); | ||
| PyErr_SetObject(BufferTooShort, result); | ||
| Py_DECREF(result); | ||
| Py_XDECREF(result); | ||
| return NULL; | ||
@@ -121,3 +133,3 @@ } | ||
| /* | ||
| * Functions for transferringn objects | ||
| * Functions for transferring objects | ||
| */ | ||
@@ -129,3 +141,4 @@ | ||
| char *buffer; | ||
| int length, res; | ||
| int res; | ||
| Py_ssize_t length; | ||
| PyObject *obj = NULL, *pickled_string = NULL; | ||
@@ -146,4 +159,9 @@ | ||
| if (length > 0x7fffffff) { | ||
| PyErr_SetString(PyExc_ValueError, "string too long"); | ||
| goto ERR; | ||
| } | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = send_string(self->handle, buffer, length); | ||
| res = send_string(self->handle, buffer, (int)length); | ||
| Py_END_ALLOW_THREADS | ||
@@ -159,3 +177,3 @@ | ||
| Py_XDECREF(pickled_string); | ||
| return FALSE; | ||
| return NULL; | ||
| } | ||
@@ -167,3 +185,3 @@ | ||
| char *freeme = NULL; | ||
| int res; | ||
| Py_ssize_t nbytes; | ||
| PyObject *result = NULL; | ||
@@ -174,14 +192,14 @@ | ||
| Py_BEGIN_ALLOW_THREADS | ||
| res = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme); | ||
| nbytes = recv_string(self->handle, self->buffer, BUFFER_SIZE, &freeme); | ||
| Py_END_ALLOW_THREADS | ||
| if (res < 0) { | ||
| SetExcFromNumber(res); | ||
| if (nbytes < 0) { | ||
| SetExcFromNumber(nbytes); | ||
| } else { | ||
| if (freeme == NULL) { | ||
| result = PyObject_CallFunction(loadsFunction, "s#", | ||
| self->buffer, res); | ||
| self->buffer, nbytes); | ||
| } else { | ||
| result = PyObject_CallFunction(loadsFunction, "s#", | ||
| freeme, res); | ||
| freeme, nbytes); | ||
| free(freeme); | ||
@@ -188,0 +206,0 @@ } |
@@ -9,11 +9,5 @@ /* | ||
| #include <Python.h> | ||
| #include "processing_defs.h" | ||
| #include "structmember.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| extern PyObject *dumpsFunction, *loadsFunction; | ||
@@ -20,0 +14,0 @@ extern PyObject *BufferTooShort; |
+18
-7
@@ -15,7 +15,7 @@ /* | ||
| #define TOO_LONG(n) (n > 0x7fffffff) | ||
| #define INVALID_HANDLE NULL | ||
| typedef HANDLE _HANDLE; | ||
| typedef unsigned uint32_t; | ||
| #define INVALID_HANDLE NULL | ||
| static _HANDLE | ||
@@ -42,2 +42,3 @@ _duplicate(_HANDLE h) | ||
| #define ALREADY_SET_ERROR (-3) | ||
| #define BAD_MESSAGE_LENGTH (-7) | ||
@@ -56,2 +57,5 @@ static PyObject * | ||
| break; | ||
| case BAD_MESSAGE_LENGTH: | ||
| PyErr_SetString(PyExc_IOError, "bad message length"); | ||
| break; | ||
| default: | ||
@@ -72,5 +76,9 @@ PyErr_SetString(PyExc_AssertionError, "unknown error number"); | ||
| if (TOO_LONG(length)) | ||
| return BAD_MESSAGE_LENGTH; | ||
| if (!WriteFile(pipe, string, length, &amount_written, NULL)) | ||
| return STANDARD_ERROR; | ||
| assert(length == amount_written); | ||
| /* assert(length == amount_written); */ | ||
| return SUCCESS; | ||
@@ -85,3 +93,3 @@ } | ||
| static int | ||
| static Py_ssize_t | ||
| recv_string(_HANDLE pipe, char *buffer, size_t buflength, char **newbuffer) | ||
@@ -94,2 +102,4 @@ { | ||
| if (ReadFile(pipe, buffer, buflength, &length, NULL)) { | ||
| if (TOO_LONG(length)) | ||
| return BAD_MESSAGE_LENGTH; | ||
| return length; | ||
@@ -103,4 +113,5 @@ } else if (GetLastError() != ERROR_MORE_DATA) { | ||
| full_length = length + left; | ||
| assert(full_length > 0); | ||
| full_length = length + left; | ||
| if (TOO_LONG(full_length)) | ||
| return BAD_MESSAGE_LENGTH; | ||
@@ -107,0 +118,0 @@ *newbuffer = malloc(full_length); |
+12
-17
@@ -9,10 +9,5 @@ /* | ||
| #include "Python.h" | ||
| #include "processing_defs.h" | ||
| #include "processing.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| extern PyTypeObject BlockerType; | ||
@@ -131,23 +126,23 @@ extern PyTypeObject SocketConnectionType; | ||
| #endif /* !NO_SENDFD */ | ||
| static PyMethodDef module_methods[] = { | ||
| #if !NO_SENDFD | ||
| {"sendfd", processing_sendfd, METH_VARARGS, | ||
| "sendfd(sockfd, fd): send file descriptor given by fd over\n" | ||
| "sendfd(sockfd, fd) -> None: send file descriptor given by fd over\n" | ||
| "the unix domain socket whose file decriptor is sockfd"}, | ||
| {"recvfd", processing_recvfd, METH_VARARGS, | ||
| "recvfd(sockfd): returns a file descriptor over\n" | ||
| "recvfd(sockfd) -> fd: returns a file descriptor over\n" | ||
| "a unix domain socket whose file decriptor is sockfd"}, | ||
| {NULL, NULL, 0, NULL} | ||
| }; | ||
| #endif | ||
| {"rwbuffer", processing_rwbuffer, METH_VARARGS, | ||
| "rwbuffer(object [, offset[, size]]) -> read-write buffer"}, | ||
| #else /* NO_SENDFD */ | ||
| {"address_of_buffer", processing_address_of_buffer, METH_O, | ||
| "address_of_buffer(obj) -> (address, size)"}, | ||
| static PyMethodDef module_methods[] = { | ||
| {NULL, NULL, 0, NULL} | ||
| }; | ||
| #endif /* NO_SENDFD */ | ||
| PyMODINIT_FUNC | ||
@@ -154,0 +149,0 @@ init_processing(void) |
+13
-9
@@ -1,10 +0,12 @@ | ||
| #include "Python.h" | ||
| #include "structmember.h" | ||
| /* | ||
| * A type which wraps a posix queue | ||
| * | ||
| * posix_queue.c | ||
| * | ||
| * Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt | ||
| */ | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| #include "processing_defs.h" | ||
| #include "structmember.h" | ||
| #include <mqueue.h> | ||
@@ -205,3 +207,4 @@ #include <fcntl.h> | ||
| { | ||
| int res, length, block = 1; | ||
| int res, block = 1; | ||
| Py_ssize_t length; | ||
| char *string; | ||
@@ -282,3 +285,4 @@ unsigned priority = 1; | ||
| { | ||
| int bytes, block = 1; | ||
| int block = 1; | ||
| Py_ssize_t bytes; | ||
| char *buffer; | ||
@@ -285,0 +289,0 @@ PyObject *result, *timeout_obj = Py_None; |
| /* | ||
| * A type which wraps a posix named semaphore | ||
| * | ||
| * win_processing.c | ||
| * posix_processing.c | ||
| * | ||
@@ -9,13 +9,6 @@ * Copyright (c) 2006, 2007, R Oudkerk --- see COPYING.txt | ||
| #include "Python.h" | ||
| #include "processing_defs.h" | ||
| #include "pythread.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| #include <semaphore.h> | ||
| #include <pthread.h> | ||
| #include <fcntl.h> | ||
@@ -22,0 +15,0 @@ #include <time.h> |
@@ -9,11 +9,4 @@ /* | ||
| #include <Python.h> | ||
| #include "structmember.h" | ||
| #include "processing_defs.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| extern PyObject *dumpsFunction, *loadsFunction; | ||
@@ -49,3 +42,2 @@ extern PyObject *BufferTooShort; | ||
| /* should probably use "n" format for fd on Python 2.5 */ | ||
| if (!PyArg_ParseTuple(args, "Oiii|i", &s, &fd, &family, &type, &proto)) | ||
@@ -52,0 +44,0 @@ return NULL; |
+39
-39
@@ -12,2 +12,4 @@ /* | ||
| #define TOO_LONG(n) (n > 0x7fffffff) | ||
| #ifdef MS_WINDOWS | ||
@@ -19,4 +21,2 @@ | ||
| #define FALSE_SOCKET | ||
| #define WIN32_LEAN_AND_MEAN | ||
@@ -70,3 +70,2 @@ #include <winsock2.h> | ||
| /* | ||
@@ -83,3 +82,3 @@ * Error values | ||
| #define EARLY_END_OF_FILE (-6) | ||
| #define NEGATIVE_MESSAGE_LENGTH (-7) | ||
| #define BAD_MESSAGE_LENGTH (-7) | ||
| #define SOME_SOCKET_ERROR (-8) | ||
@@ -110,12 +109,8 @@ | ||
| break; | ||
| case SELECT_ERROR: | ||
| PyErr_SetString(PyExc_IOError, "select failed"); | ||
| case BAD_MESSAGE_LENGTH: | ||
| PyErr_SetString(PyExc_IOError, "message length is bad"); | ||
| break; | ||
| case NEGATIVE_MESSAGE_LENGTH: | ||
| PyErr_SetString(PyExc_IOError, "message length is negative"); | ||
| break; | ||
| #ifdef MS_WINDOWS | ||
| case SOME_SOCKET_ERROR: | ||
| PyErr_Format(PyExc_IOError, "windows socket error %d", | ||
| WSAGetLastError()); | ||
| PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()) ; | ||
| break; | ||
@@ -137,3 +132,3 @@ #endif | ||
| char *p = string; | ||
| int res; | ||
| Py_ssize_t res; | ||
@@ -158,3 +153,4 @@ while (length > 0) { | ||
| { | ||
| int remaining = length, temp; | ||
| size_t remaining = length; | ||
| Py_ssize_t temp; | ||
| char *p = buffer; | ||
@@ -166,3 +162,3 @@ | ||
| if (temp == 0) | ||
| return remaining == (int)length | ||
| return remaining == length | ||
| ? END_OF_FILE : EARLY_END_OF_FILE; | ||
@@ -186,11 +182,11 @@ else | ||
| { | ||
| if (length < 16384) { | ||
| char *message, *p; | ||
| if (length < 0x4000) { | ||
| char *message; | ||
| int res; | ||
| p = message = malloc(length+4); | ||
| if (p == NULL) | ||
| message = malloc(length+4); | ||
| if (message == NULL) | ||
| return MEMORY_ERROR; | ||
| *(uint32_t*)message = htonl(length); | ||
| *(uint32_t*)message = htonl((uint32_t)length); | ||
| memcpy(message+4, string, length); | ||
@@ -201,5 +197,9 @@ res = _sendall(h, message, length+4); | ||
| } else { | ||
| char lenbuff[4]; | ||
| *(uint32_t*)lenbuff = htonl(length); | ||
| return _sendall(h, lenbuff, 4) || _sendall(h, string, length); | ||
| uint32_t lenbuff; | ||
| if (TOO_LONG(length)) | ||
| return BAD_MESSAGE_LENGTH; | ||
| lenbuff = htonl((uint32_t)length); | ||
| return _sendall(h, (char*)&lenbuff, 4) || _sendall(h, string, length); | ||
| } | ||
@@ -214,27 +214,27 @@ } | ||
| static int | ||
| static Py_ssize_t | ||
| recv_string(_HANDLE h, char *buffer, size_t buflength, char **newbuffer) | ||
| { | ||
| int res, length; | ||
| char lenbuff[4]; | ||
| int res; | ||
| uint32_t ulength; | ||
| *newbuffer = NULL; | ||
| res = _recvall(h, lenbuff, 4); | ||
| res = _recvall(h, (char*)&ulength, 4); | ||
| if (res < 0) | ||
| return res; | ||
| length = htonl(*(uint32_t*) lenbuff); | ||
| if (length < 0) | ||
| return NEGATIVE_MESSAGE_LENGTH; | ||
| if (length <= (int)buflength) { | ||
| res = _recvall(h, buffer, length); | ||
| return res < 0 ? res : length; | ||
| ulength = ntohl(ulength); | ||
| if (TOO_LONG(ulength)) | ||
| return BAD_MESSAGE_LENGTH; | ||
| if (ulength <= buflength) { | ||
| res = _recvall(h, buffer, (size_t)ulength); | ||
| return res < 0 ? res : ulength; | ||
| } else { | ||
| *newbuffer = malloc(length > 0 ? length : 1); | ||
| *newbuffer = malloc(ulength > 0 ? (size_t)ulength : 1); | ||
| if (*newbuffer == NULL) | ||
| return MEMORY_ERROR; | ||
| res = _recvall(h, *newbuffer, length); | ||
| return res < 0 ? res : length; | ||
| res = _recvall(h, *newbuffer, (size_t)ulength); | ||
| return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength; | ||
| } | ||
@@ -260,4 +260,4 @@ } | ||
| tv.tv_sec = (int)timeout; | ||
| tv.tv_usec = (int)((timeout - tv.tv_sec) * 1e6); | ||
| tv.tv_sec = (long)timeout; | ||
| tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6); | ||
@@ -268,3 +268,3 @@ res = select(fd+1, &rfds, NULL, NULL, &tv); | ||
| #ifdef MS_WINDOWS | ||
| return SELECT_ERROR; | ||
| return SOME_SOCKET_ERROR; | ||
| #else | ||
@@ -271,0 +271,0 @@ return STANDARD_ERROR; |
+46
-32
@@ -9,13 +9,10 @@ /* | ||
| #include "Python.h" | ||
| #include "processing_defs.h" | ||
| #include "processing.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| #define WIN32_LEAN_AND_MEAN | ||
| #include <windows.h> | ||
| #define PIPE_BUFFER_SIZE 8192 | ||
| extern PyTypeObject BlockerType; | ||
@@ -33,6 +30,4 @@ extern PyTypeObject PipeConnectionType; | ||
| #define PIPE_BUFFER_SIZE 8192 | ||
| static PyObject * | ||
| processing_createpipe(PyObject *self, PyObject *args) | ||
| processing_createpipe(PyObject *self, PyObject *args, PyObject *kwds) | ||
| { | ||
@@ -42,11 +37,17 @@ char *name; | ||
| DWORD timeout = NMPWAIT_WAIT_FOREVER; | ||
| DWORD openmode = PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT; | ||
| DWORD openmode = PIPE_ACCESS_DUPLEX; | ||
| DWORD pipemode = PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT; | ||
| if ( !PyArg_ParseTuple( args, "s|i", &name, &timeout) ) | ||
| DWORD obufsz = openmode & PIPE_ACCESS_OUTBOUND ? PIPE_BUFFER_SIZE : 0; | ||
| DWORD ibufsz = openmode & PIPE_ACCESS_INBOUND ? PIPE_BUFFER_SIZE : 0; | ||
| static char *kwlist[] = {"name", "timeout", "openmode", NULL}; | ||
| if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ii", kwlist, | ||
| &name, &timeout, &openmode)) | ||
| return NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| pipe = CreateNamedPipe(name, PIPE_ACCESS_DUPLEX, openmode, | ||
| PIPE_UNLIMITED_INSTANCES, PIPE_BUFFER_SIZE, | ||
| PIPE_BUFFER_SIZE, timeout, NULL); | ||
| pipe = CreateNamedPipe(name, openmode, pipemode, PIPE_UNLIMITED_INSTANCES, | ||
| obufsz, ibufsz, timeout, NULL); | ||
| Py_END_ALLOW_THREADS | ||
@@ -56,4 +57,3 @@ | ||
| CloseHandle(pipe); | ||
| PyErr_SetFromWindowsErr(0); | ||
| return NULL; | ||
| return PyErr_SetFromWindowsErr(0); | ||
| } | ||
@@ -75,3 +75,3 @@ | ||
| if ( !PyArg_ParseTuple( args, "i", &pipe ) ) | ||
| if (!PyArg_ParseTuple(args, "i", &pipe)) | ||
| return NULL; | ||
@@ -85,4 +85,3 @@ | ||
| CloseHandle(pipe); | ||
| PyErr_SetFromWindowsErr(0); | ||
| return NULL; | ||
| return PyErr_SetFromWindowsErr(0); | ||
| } | ||
@@ -105,3 +104,3 @@ | ||
| if ( !PyArg_ParseTuple( args, "si", &name, &timeout ) ) | ||
| if (!PyArg_ParseTuple(args, "si", &name, &timeout)) | ||
| return NULL; | ||
@@ -113,6 +112,4 @@ | ||
| if (!success) { | ||
| PyErr_SetFromWindowsErr(0); | ||
| return NULL; | ||
| } | ||
| if (!success) | ||
| return PyErr_SetFromWindowsErr(0); | ||
@@ -129,9 +126,9 @@ Py_RETURN_NONE; | ||
| DWORD dwMode = PIPE_READMODE_MESSAGE; | ||
| DWORD access = GENERIC_READ | GENERIC_WRITE; | ||
| if ( !PyArg_ParseTuple( args, "s", &name ) ) | ||
| if (!PyArg_ParseTuple(args, "s|I", &name, &access)) | ||
| return NULL; | ||
| Py_BEGIN_ALLOW_THREADS | ||
| pipe = CreateFile(name, GENERIC_READ | GENERIC_WRITE, | ||
| 0, NULL, OPEN_EXISTING, 0, NULL); | ||
| pipe = CreateFile(name, access, 0, NULL, OPEN_EXISTING, 0, NULL); | ||
| if (pipe == INVALID_HANDLE_VALUE) | ||
@@ -146,4 +143,3 @@ success = FALSE; | ||
| CloseHandle(pipe); | ||
| PyErr_SetFromWindowsErr(0); | ||
| return NULL; | ||
| return PyErr_SetFromWindowsErr(0); | ||
| } | ||
@@ -163,7 +159,14 @@ | ||
| HANDLE pipe; | ||
| BOOL success; | ||
| if (!PyArg_ParseTuple(args, "i", &pipe)) | ||
| return NULL; | ||
| if (!CloseHandle(pipe)) | ||
| Py_BEGIN_ALLOW_THREADS | ||
| success = CloseHandle(pipe); | ||
| Py_END_ALLOW_THREADS | ||
| if (!success) | ||
| return PyErr_SetFromWindowsErr(0); | ||
| Py_RETURN_NONE; | ||
@@ -183,4 +186,6 @@ } | ||
| return NULL; | ||
| if (!SetConsoleCtrlHandler(NULL, !value)) | ||
| return PyErr_SetFromWindowsErr(0); | ||
| Py_RETURN_NONE; | ||
@@ -196,4 +201,6 @@ } | ||
| return NULL; | ||
| if (!GenerateConsoleCtrlEvent(event, gid)) | ||
| return PyErr_SetFromWindowsErr(0); | ||
| Py_RETURN_NONE; | ||
@@ -265,3 +272,4 @@ } | ||
| {"createpipe", (PyCFunction)processing_createpipe, METH_VARARGS, ""}, | ||
| {"createpipe", | ||
| (PyCFunction)processing_createpipe, METH_VARARGS | METH_KEYWORDS, ""}, | ||
@@ -276,2 +284,6 @@ {"connectpipe", (PyCFunction)processing_connectpipe, METH_VARARGS, ""}, | ||
| {"rwbuffer", processing_rwbuffer, METH_VARARGS, ""}, | ||
| {"address_of_buffer", processing_address_of_buffer, METH_O, ""}, | ||
| {NULL} /* Sentinel */ | ||
@@ -318,2 +330,4 @@ }; | ||
| NULL, NULL); | ||
| if (!BufferTooShort) | ||
| return; | ||
| Py_INCREF(BufferTooShort); | ||
@@ -320,0 +334,0 @@ PyModule_AddObject(m, "BufferTooShort", BufferTooShort); |
@@ -9,11 +9,5 @@ /* | ||
| #include "Python.h" | ||
| #include "processing_defs.h" | ||
| #include "pythread.h" | ||
| #ifndef Py_RETURN_NONE | ||
| #define Py_RETURN_NONE return Py_INCREF(Py_None), Py_None | ||
| #define Py_RETURN_TRUE return Py_INCREF(Py_True), Py_True | ||
| #define Py_RETURN_FALSE return Py_INCREF(Py_False), Py_False | ||
| #endif | ||
| #define WIN32_LEAN_AND_MEAN | ||
@@ -20,0 +14,0 @@ #include <windows.h> |
+4
-2
@@ -83,3 +83,5 @@ # | ||
| # `self._block._close()` will be called before `os._exit()`. | ||
| process.Finalize(self, self._block._close, atexit=True) | ||
| process.Finalize( | ||
| self, self._block._close, atexit=True, priority=-10 | ||
| ) | ||
@@ -223,3 +225,3 @@ self.acquire = self._block.acquire | ||
| try: | ||
| # wait for notification | ||
| # wait for notification or timeout | ||
| if timeout is None: | ||
@@ -226,0 +228,0 @@ self._wait_semaphore.acquire() |
+4
-5
@@ -28,4 +28,3 @@ # | ||
| from processing.test import test_processing, test_newtype, \ | ||
| test_doc, test_speed, test_connection, test_reduction, test_stop, \ | ||
| test_workers, test_pool | ||
| test_doc, test_speed, test_connection, test_reduction, test_stop | ||
@@ -41,9 +40,9 @@ old_processes = set(activeChildren()) | ||
| if HAVE_NATIVE_SEMAPHORE: | ||
| from processing.test import test_workers, test_pool | ||
| run_test(test_processing, ['processes', 'processes+server', 'threads']) | ||
| run_test(test_workers, ['processes']) | ||
| run_test(test_pool, ['processes']) | ||
| else: | ||
| run_test(test_processing, ['processes+server', 'threads']) | ||
| run_test(test_workers, ['processes']) | ||
| run_test(test_pool, ['processes']) | ||
| processes = set(activeChildren()) | ||
@@ -50,0 +49,0 @@ assert processes.issubset(old_processes) |
@@ -103,5 +103,5 @@ # | ||
| t = time.time() | ||
| C = list(pool.imap(pow3, xrange(N), chunksize=10000)) | ||
| print '\tlist(pool.imap(pow3, xrange(%d), chunksize=10000)):\n\t\t%s' \ | ||
| ' seconds' % (N, time.time() - t) | ||
| C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) | ||
| print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ | ||
| ' seconds' % (N, N//8, time.time() - t) | ||
@@ -126,5 +126,5 @@ assert A == B == C, (len(A), len(B), len(C)) | ||
| t = time.time() | ||
| C = list(pool.imap(noop, L, chunksize=10000)) | ||
| print '\tlist(pool.imap(noop, L, chunksize=10000)):\n\t\t%s seconds' % \ | ||
| (time.time() - t) | ||
| C = list(pool.imap(noop, L, chunksize=len(L)//8)) | ||
| print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ | ||
| (len(L)//8, time.time() - t) | ||
@@ -131,0 +131,0 @@ assert A == B == C, (len(A), len(B), len(C)) |
@@ -16,3 +16,6 @@ # | ||
| from processing import * | ||
| Manager = LocalManager | ||
| try: | ||
| Manager = LocalManager | ||
| except NameError: | ||
| Manager = None | ||
| else: | ||
@@ -19,0 +22,0 @@ raise ValueError, config |
+10
-2
@@ -179,2 +179,7 @@ # | ||
| import processing | ||
| try: | ||
| from processing.sharedctypes import new_array | ||
| except ImportError: | ||
| new_array = None | ||
| manager = processing.Manager() | ||
@@ -208,7 +213,10 @@ local_manager = processing.LocalManager() | ||
| test_seqspeed(range(10)) | ||
| print '\n\t######## testing shared memory array\n' | ||
| print '\n\t######## testing LocalManager.SharedArray("i", ...)\n' | ||
| test_seqspeed(local_manager.SharedArray('i', range(10))) | ||
| print '\n\t######## testing list managed by server process\n' | ||
| test_seqspeed(manager.list(range(10))) | ||
| if new_array: | ||
| print '\n\t######## testing sharedctypes.new_array("i", ...)\n' | ||
| test_seqspeed(new_array('i', range(10))) | ||
@@ -215,0 +223,0 @@ |
+3
-2
@@ -8,5 +8,6 @@ ======== | ||
| Alexey Akimov, Michele Bertoldi, Josiah Carlson, Lisandro Dalcin, | ||
| Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Gerald | ||
| John M. Manipon, Kevin Manley, Paul Rudin, Dominique Wahli. | ||
| Markus Gritsch, Doug Hellmann, Charlie Hull, Richard Jones, Alexy | ||
| Khrabrov, Gerald John M. Manipon, Kevin Manley, Paul Rudin, | ||
| Dominique Wahli. | ||
| Sorry if I have forgotten anyone. |
| # | ||
| # A test file for the processing package (and processing.dummy) | ||
| # | ||
| from __future__ import with_statement | ||
| import time, sys, random | ||
| from processing import * | ||
| from Queue import Empty | ||
| #### TEST_NAMESPACE | ||
| def namespace_func(running, mutex): | ||
| random.seed() | ||
| time.sleep(random.random()*4) | ||
| with mutex: | ||
| print '\n\t\t\t' + str(currentProcess()) + ' has finished' | ||
| running.value -= 1 | ||
| def test_namespace(manager): | ||
| TASKS = 10 | ||
| running = manager.SharedValue('i', TASKS) | ||
| mutex = manager.Lock() | ||
| for i in range(TASKS): | ||
| Process(target=namespace_func, args=[running, mutex]).start() | ||
| while running.value > 0: | ||
| time.sleep(0.08) | ||
| with mutex: | ||
| print running.value, | ||
| sys.stdout.flush() | ||
| print 'No more running processes' | ||
| #### TEST_QUEUE | ||
| def queue_func(queue): | ||
| for i in xrange(30): | ||
| time.sleep(0.5 * random.random()) | ||
| queue.put(i*i) | ||
| queue.put('STOP') | ||
| def test_queue(manager): | ||
| q = manager.Queue() | ||
| p = Process(target=queue_func, args=[q]) | ||
| p.start() | ||
| o = None | ||
| while o != 'STOP': | ||
| try: | ||
| o = q.get(timeout=0.3) | ||
| print o, | ||
| sys.stdout.flush() | ||
| except Empty: | ||
| print 'TIMEOUT' | ||
| #### TEST_CONDITION | ||
| def condition_func(cond): | ||
| with cond: | ||
| print '\t' + str(cond) | ||
| time.sleep(2) | ||
| print '\tchild is notifying' | ||
| cond.notify() | ||
| print '\t' + str(cond) | ||
| def test_condition(manager): | ||
| cond = manager.Condition() | ||
| p = Process(target=condition_func, args=[cond]) | ||
| print cond | ||
| with cond: | ||
| print cond | ||
| with cond: | ||
| print cond | ||
| p.start() | ||
| print 'main is waiting' | ||
| cond.wait() | ||
| print 'main has woken up' | ||
| print cond | ||
| print cond | ||
| p.join() | ||
| print cond | ||
| #### TEST_SEMAPHORE | ||
| def semaphore_func(sema, mutex, running): | ||
| with sema: | ||
| with mutex: | ||
| running.value += 1 | ||
| print running.value, 'tasks are running' | ||
| random.seed() | ||
| time.sleep(random.random()*2) | ||
| with mutex: | ||
| running.value -= 1 | ||
| print '%s has finished' % currentProcess() | ||
| def test_semaphore(manager): | ||
| sema = manager.Semaphore(3) | ||
| mutex = manager.RLock() | ||
| running = manager.SharedValue('i', 0) | ||
| processes = [Process(target=semaphore_func, args=[sema, mutex, running]) | ||
| for i in range(10)] | ||
| for p in processes: | ||
| p.start() | ||
| for p in processes: | ||
| p.join() | ||
| #### TEST_JOIN_TIMEOUT | ||
| def join_timeout_func(): | ||
| print '\tchild sleeping' | ||
| time.sleep(5.5) | ||
| print '\n\tchild terminating' | ||
| def test_join_timeout(manager): | ||
| p = Process(target=join_timeout_func) | ||
| p.start() | ||
| print 'waiting for process to finish' | ||
| while 1: | ||
| p.join(timeout=1) | ||
| if not p.isAlive(): | ||
| break | ||
| print '.', | ||
| sys.stdout.flush() | ||
| #### TEST_EVENT | ||
| def event_func(event): | ||
| print '\t%r is waiting' % currentProcess() | ||
| event.wait() | ||
| print '\t%r has woken up' % currentProcess() | ||
| def test_event(manager): | ||
| event = manager.Event() | ||
| processes = [Process(target=event_func, args=[event]) for i in range(5)] | ||
| for p in processes: | ||
| p.start() | ||
| print 'main is sleeping' | ||
| time.sleep(2) | ||
| print 'main is setting event' | ||
| event.set() | ||
| for p in processes: | ||
| p.join() | ||
| #### TEST_SHAREDVALUES | ||
| def sharedvalues_func(values, structs, arrays, | ||
| shared_values, shared_structs, shared_arrays): | ||
| for i in range(len(values)): | ||
| v = values[i][1] | ||
| sv = shared_values[i].value | ||
| assert v == sv | ||
| for i in range(len(structs)): | ||
| s = structs[i][1] | ||
| ss = shared_structs[i].value | ||
| assert s == ss, (s, ss) | ||
| for i in range(len(values)): | ||
| a = arrays[i][1] | ||
| sa = list(shared_arrays[i][:]) | ||
| assert a == sa | ||
| print 'Tests passed' | ||
| def test_sharedvalues(manager): | ||
| if sys.platform == 'cygwin' and hasattr(manager, '_getheap'): | ||
| print >>sys.stderr, 'cygwin does not allow resizing of mmaps' | ||
| return | ||
| values = [ | ||
| ('i', 10), | ||
| ('h', -2), | ||
| ('16p', 'hello') | ||
| ] | ||
| structs = [ | ||
| ('hd', (10, 0.75)), | ||
| ('10d', tuple(0.375 * i for i in range(10))), | ||
| ('cccc', ('a', 'b', 'c', 'd')) | ||
| ] | ||
| arrays = [ | ||
| ('i', range(100)), | ||
| ('d', [0.25 * i for i in range(100)]), | ||
| ('H', range(1000)) | ||
| ] | ||
| shared_values = [manager.SharedValue(id, v) for id, v in values] | ||
| shared_structs = [manager.SharedStruct(id, s) for id, s in structs] | ||
| shared_arrays = [manager.SharedArray(id, a) for id, a in arrays] | ||
| p = Process( | ||
| target=sharedvalues_func, | ||
| args=(values, structs, arrays, | ||
| shared_values, shared_structs, shared_arrays) | ||
| ) | ||
| p.start() | ||
| p.join() | ||
| assert p.getExitCode() == 0 | ||
| #### | ||
| def main(): | ||
| with Manager() as manager: | ||
| for func in [ test_namespace, test_queue, test_condition, | ||
| test_semaphore, test_join_timeout, test_event, | ||
| test_sharedvalues ]: | ||
| print '\n\t######## %s\n' % func.__name__ | ||
| func(manager) | ||
| ignore = activeChildren() # cleanup any old processes | ||
| info = manager._debug_info() | ||
| if info is not None: | ||
| print info | ||
| raise ValueError, 'there should be no positive refcounts left' | ||
| if __name__ == '__main__': | ||
| freezeSupport() | ||
| main() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
535780
3.4%75
15.38%5740
0.05%